Skip to content

Commit e5987b6

Browse files
committed
[TS] Allow brotli to be specified for compression and reorganize some websocket stuff
1 parent c5743cf commit e5987b6

7 files changed

Lines changed: 129 additions & 141 deletions

File tree

crates/bindings-typescript/src/sdk/db_connection_builder.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
} from '../';
99
import { ensureMinimumVersionOrThrow } from './version';
1010
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter';
11+
import type { WebSocketFactory } from './ws';
1112

1213
/**
1314
* The database client connection to a SpacetimeDB server.
@@ -23,10 +24,10 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
2324
#identity?: Identity;
2425
#token?: string;
2526
#emitter: EventEmitter<ConnectionEvent> = new EventEmitter();
26-
#compression: 'gzip' | 'none' = 'gzip';
27+
#compression: 'gzip' | 'brotli' | 'none' = 'gzip';
2728
#lightMode: boolean = false;
2829
#confirmedReads?: boolean;
29-
#createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
30+
#createWSFn: WebSocketFactory;
3031

3132
/**
3233
* Creates a new `DbConnectionBuilder` database client and set the initial parameters.
@@ -42,7 +43,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
4243
config: DbConnectionConfig<RemoteModuleOf<DbConnection>>
4344
) => DbConnection
4445
) {
45-
this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn;
46+
this.#createWSFn = WebsocketDecompressAdapter.openWebSocket;
4647
}
4748

4849
/**
@@ -82,9 +83,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
8283
return this;
8384
}
8485

85-
withWSFn(
86-
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn
87-
): this {
86+
withWSFn(createWSFn: WebSocketFactory): this {
8887
this.#createWSFn = createWSFn;
8988
return this;
9089
}
@@ -94,7 +93,17 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
9493
*
9594
* @param compression The compression algorithm to use for the connection.
9695
*/
97-
withCompression(compression: 'gzip' | 'none'): this {
96+
withCompression(compression: 'gzip' | 'brotli' | 'none'): this {
97+
if (compression === 'brotli') {
98+
try {
99+
new DecompressionStream('brotli' as CompressionFormat);
100+
} catch (e) {
101+
throw new TypeError(
102+
`Brotli compression is not supported by the runtime. Please choose a different compression method.`,
103+
{ cause: e }
104+
);
105+
}
106+
}
98107
this.#compression = compression;
99108
return this;
100109
}

crates/bindings-typescript/src/sdk/db_connection_impl.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ import {
3737
type PendingCallback,
3838
type TableUpdate as CacheTableUpdate,
3939
} from './table_cache.ts';
40-
import {
41-
WebsocketDecompressAdapter,
42-
type WebsocketAdapter,
43-
} from './websocket_decompress_adapter.ts';
4440
import {
4541
SubscriptionBuilderImpl,
4642
SubscriptionHandleImpl,
@@ -60,6 +56,7 @@ import type { ProceduresView } from './procedures.ts';
6056
import type { Values } from '../lib/type_util.ts';
6157
import type { TransactionUpdate } from './client_api/types.ts';
6258
import { InternalError, SenderError } from '../lib/errors.ts';
59+
import type { WebSocketAdapter, WebSocketFactory } from './ws.ts';
6360

6461
export {
6562
DbConnectionBuilder,
@@ -89,8 +86,8 @@ export type DbConnectionConfig<RemoteModule extends UntypedRemoteModule> = {
8986
identity?: Identity;
9087
token?: string;
9188
emitter: EventEmitter<ConnectionEvent>;
92-
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
93-
compression: 'gzip' | 'none';
89+
createWSFn: WebSocketFactory;
90+
compression: 'gzip' | 'brotli' | 'none';
9491
lightMode: boolean;
9592
confirmedReads?: boolean;
9693
remoteModule: RemoteModule;
@@ -173,8 +170,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
173170
// private fields.
174171
// We use them in testing.
175172
private clientCache: ClientCache<RemoteModule>;
176-
private ws?: WebsocketAdapter;
177-
private wsPromise: Promise<WebsocketAdapter | undefined>;
173+
private ws?: WebSocketAdapter;
174+
private wsPromise: Promise<WebSocketAdapter | undefined>;
178175

179176
constructor({
180177
uri,
@@ -541,7 +538,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
541538
return this.#mergeTableUpdates(updates);
542539
}
543540

544-
#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
541+
#flushOutboundQueue(wsResolved: WebSocketAdapter): void {
545542
const pending = this.#outboundQueue.splice(0);
546543
for (const message of pending) {
547544
wsResolved.send(message);
Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
export async function decompress(
2-
buffer: Uint8Array,
3-
// Leaving it here to expand to brotli when it lands in the browsers and NodeJS
4-
type: 'gzip',
2+
buffer: Uint8Array<ArrayBuffer>,
3+
type: CompressionFormat,
54
chunkSize: number = 128 * 1024 // 128KB
65
): Promise<Uint8Array> {
76
// Create a single ReadableStream to handle chunks
87
let offset = 0;
9-
const readableStream = new ReadableStream({
8+
const readableStream = new ReadableStream<BufferSource>({
109
pull(controller) {
1110
if (offset < buffer.length) {
1211
// Slice a chunk of the buffer and enqueue it
@@ -29,24 +28,6 @@ export async function decompress(
2928
const decompressedStream = readableStream.pipeThrough(decompressionStream);
3029

3130
// Collect the decompressed chunks efficiently
32-
const reader = decompressedStream.getReader();
33-
const chunks: Uint8Array[] = [];
34-
let totalLength = 0;
35-
let result: any;
36-
37-
while (!(result = await reader.read()).done) {
38-
chunks.push(result.value);
39-
totalLength += result.value.length;
40-
}
41-
42-
// Allocate a single Uint8Array for the decompressed data
43-
const decompressedArray = new Uint8Array(totalLength);
44-
let chunkOffset = 0;
45-
46-
for (const chunk of chunks) {
47-
decompressedArray.set(chunk, chunkOffset);
48-
chunkOffset += chunk.length;
49-
}
50-
51-
return decompressedArray;
31+
const chunks = await Array.fromAsync(decompressedStream);
32+
return new Blob(chunks).bytes();
5233
}
Lines changed: 11 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
import { decompress } from './decompress';
2-
import { resolveWS } from './ws';
2+
import { openWebSocket, type WebSocketAdapter, type WebSocketArgs } from './ws';
33

4-
export interface WebsocketAdapter {
5-
send(msg: Uint8Array): void;
6-
close(): void;
7-
8-
set onclose(handler: (ev: CloseEvent) => void);
9-
set onopen(handler: () => void);
10-
set onmessage(handler: (msg: { data: Uint8Array }) => void);
11-
set onerror(handler: (msg: ErrorEvent) => void);
12-
}
13-
14-
export class WebsocketDecompressAdapter implements WebsocketAdapter {
4+
export class WebsocketDecompressAdapter implements WebSocketAdapter {
155
set onclose(handler: (ev: CloseEvent) => void) {
166
this.#ws.onclose = handler;
177
}
@@ -30,16 +20,17 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
3020

3121
#ws: WebSocket;
3222

33-
async #decompress(buffer: Uint8Array): Promise<Uint8Array> {
23+
async #decompress(buffer: Uint8Array<ArrayBuffer>): Promise<Uint8Array> {
3424
const tag = buffer[0];
3525
const data = buffer.subarray(1);
3626
switch (tag) {
3727
case 0:
3828
return data;
3929
case 1:
40-
throw new Error(
41-
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
42-
);
30+
// Some runtimes support brotli, but it's not yet defined in `lib.dom.d.ts`.
31+
// We assert runtime support in `DbConnectionBuilder.withCompression`, so
32+
// this cast is safe.
33+
return await decompress(data, 'brotli' as CompressionFormat);
4334
case 2:
4435
return await decompress(data, 'gzip');
4536
default:
@@ -58,68 +49,12 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
5849
}
5950

6051
constructor(ws: WebSocket) {
61-
ws.binaryType = 'arraybuffer';
62-
6352
this.#ws = ws;
6453
}
6554

66-
static async createWebSocketFn({
67-
url,
68-
nameOrAddress,
69-
wsProtocol,
70-
authToken,
71-
compression,
72-
lightMode,
73-
confirmedReads,
74-
}: {
75-
url: URL;
76-
wsProtocol: string;
77-
nameOrAddress: string;
78-
authToken?: string;
79-
compression: 'gzip' | 'none';
80-
lightMode: boolean;
81-
confirmedReads?: boolean;
82-
}): Promise<WebsocketDecompressAdapter> {
83-
const headers = new Headers();
84-
85-
const WS = await resolveWS();
86-
87-
// We swap our original token to a shorter-lived token
88-
// to avoid sending the original via query params.
89-
let temporaryAuthToken: string | undefined = undefined;
90-
if (authToken) {
91-
headers.set('Authorization', `Bearer ${authToken}`);
92-
const tokenUrl = new URL('v1/identity/websocket-token', url);
93-
tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:';
94-
95-
const response = await fetch(tokenUrl, { method: 'POST', headers });
96-
if (response.ok) {
97-
const { token } = await response.json();
98-
temporaryAuthToken = token;
99-
} else {
100-
return Promise.reject(
101-
new Error(`Failed to verify token: ${response.statusText}`)
102-
);
103-
}
104-
}
105-
106-
const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url);
107-
if (temporaryAuthToken) {
108-
databaseUrl.searchParams.set('token', temporaryAuthToken);
109-
}
110-
databaseUrl.searchParams.set(
111-
'compression',
112-
compression === 'gzip' ? 'Gzip' : 'None'
113-
);
114-
if (lightMode) {
115-
databaseUrl.searchParams.set('light', 'true');
116-
}
117-
if (confirmedReads !== undefined) {
118-
databaseUrl.searchParams.set('confirmed', confirmedReads.toString());
119-
}
120-
121-
const ws = new WS(databaseUrl.toString(), wsProtocol);
122-
123-
return new WebsocketDecompressAdapter(ws);
55+
static async openWebSocket(
56+
args: WebSocketArgs
57+
): Promise<WebsocketDecompressAdapter> {
58+
return new this(await openWebSocket(args));
12459
}
12560
}

crates/bindings-typescript/src/sdk/websocket_test_adapter.ts

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { BinaryReader, BinaryWriter } from '../';
22
import { ClientMessage, ServerMessage } from './client_api/types';
3-
import type { WebsocketAdapter } from './websocket_decompress_adapter';
3+
import type { WebSocketAdapter, WebSocketFactory } from './ws';
44

5-
class WebsocketTestAdapter implements WebsocketAdapter {
5+
class WebsocketTestAdapter implements WebSocketAdapter {
66
onclose: any;
77
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
88
onopen!: () => void;
@@ -47,18 +47,7 @@ class WebsocketTestAdapter implements WebsocketAdapter {
4747
this.onmessage({ data: rawBytes });
4848
}
4949

50-
async createWebSocketFn(_args: {
51-
url: URL;
52-
wsProtocol: string;
53-
nameOrAddress: string;
54-
authToken?: string;
55-
compression: 'gzip' | 'none';
56-
lightMode: boolean;
57-
confirmedReads?: boolean;
58-
}): Promise<WebsocketTestAdapter> {
59-
return this;
60-
}
50+
openWebSocket: WebSocketFactory = async () => this;
6151
}
6252

63-
export type { WebsocketTestAdapter };
6453
export default WebsocketTestAdapter;

crates/bindings-typescript/src/sdk/ws.ts

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { stdbLogger } from './logger';
22

3-
export async function resolveWS(): Promise<typeof WebSocket> {
3+
async function resolveWS(): Promise<typeof WebSocket> {
44
// Browser or Node >= 22 (or any env that exposes global WebSocket)
5-
if (typeof (globalThis as any).WebSocket !== 'undefined') {
6-
return (globalThis as any).WebSocket as typeof WebSocket;
5+
if (typeof WebSocket !== 'undefined') {
6+
return WebSocket;
77
}
88

99
// Node without a global WebSocket: lazily load undici's polyfill.
@@ -25,3 +25,80 @@ export async function resolveWS(): Promise<typeof WebSocket> {
2525
throw err;
2626
}
2727
}
28+
29+
export interface WebSocketAdapter {
30+
send(msg: Uint8Array): void;
31+
close(): void;
32+
33+
set onclose(handler: (ev: CloseEvent) => void);
34+
set onopen(handler: () => void);
35+
set onmessage(handler: (msg: { data: Uint8Array }) => void);
36+
set onerror(handler: (msg: ErrorEvent) => void);
37+
}
38+
39+
export interface WebSocketArgs {
40+
url: URL;
41+
wsProtocol: string;
42+
nameOrAddress: string;
43+
authToken?: string;
44+
compression: 'gzip' | 'brotli' | 'none';
45+
lightMode: boolean;
46+
confirmedReads?: boolean;
47+
}
48+
export type WebSocketFactory = (
49+
args: WebSocketArgs
50+
) => Promise<WebSocketAdapter>;
51+
52+
/**
53+
* Open a WebSocket to the database specified by the given `WebSocketArgs`.
54+
* @returns a WebSocket with `binaryType` set to `arraybuffer`.
55+
*/
56+
export async function openWebSocket({
57+
url,
58+
nameOrAddress,
59+
wsProtocol,
60+
authToken,
61+
compression,
62+
lightMode,
63+
confirmedReads,
64+
}: WebSocketArgs): Promise<WebSocket> {
65+
const headers = new Headers();
66+
67+
const WS = await resolveWS();
68+
69+
// We swap our original token to a shorter-lived token
70+
// to avoid sending the original via query params.
71+
let temporaryAuthToken: string | undefined;
72+
if (authToken) {
73+
headers.set('Authorization', `Bearer ${authToken}`);
74+
const tokenUrl = new URL('v1/identity/websocket-token', url);
75+
tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:';
76+
77+
const response = await fetch(tokenUrl, { method: 'POST', headers });
78+
if (response.ok) {
79+
const { token } = await response.json();
80+
temporaryAuthToken = token;
81+
} else {
82+
throw new Error(`Failed to verify token: ${response.statusText}`);
83+
}
84+
}
85+
86+
const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url);
87+
if (temporaryAuthToken) {
88+
databaseUrl.searchParams.set('token', temporaryAuthToken);
89+
}
90+
databaseUrl.searchParams.set(
91+
'compression',
92+
{ gzip: 'Gzip', brotli: 'Brotli', none: 'None' }[compression] ?? 'None'
93+
);
94+
if (lightMode) {
95+
databaseUrl.searchParams.set('light', 'true');
96+
}
97+
if (confirmedReads !== undefined) {
98+
databaseUrl.searchParams.set('confirmed', confirmedReads.toString());
99+
}
100+
101+
const ws = new WS(databaseUrl.toString(), wsProtocol);
102+
ws.binaryType = 'arraybuffer';
103+
return ws;
104+
}

0 commit comments

Comments
 (0)