diff --git a/tests/XFTPWebTests.hs b/tests/XFTPWebTests.hs index 4f7a64bc7..fa402ad6f 100644 --- a/tests/XFTPWebTests.hs +++ b/tests/XFTPWebTests.hs @@ -56,6 +56,10 @@ import qualified Simplex.Messaging.Crypto.File as CF xftpWebDir :: FilePath xftpWebDir = "xftp-web" +-- | Redirect console.log/warn to stderr so library debug output doesn't pollute stdout binary data. +redirectConsole :: String +redirectConsole = "console.log = console.warn = (...a) => process.stderr.write(a.map(String).join(' ') + '\\n');" + -- | Run an inline ES module script via node, return stdout as ByteString. callNode :: String -> IO B.ByteString callNode script = do @@ -63,7 +67,7 @@ callNode script = do let nodeEnv = ("NODE_TLS_REJECT_UNAUTHORIZED", "0") : baseEnv (_, Just hout, Just herr, ph) <- createProcess - (proc "node" ["--input-type=module", "-e", script]) + (proc "node" ["--input-type=module", "-e", redirectConsole <> script]) { std_out = CreatePipe, std_err = CreatePipe, cwd = Just xftpWebDir, @@ -2900,14 +2904,15 @@ pingTest cfg caFile = do callNode $ "import sodium from 'libsodium-wrappers-sumo';\ \import * as Addr from './dist/protocol/address.js';\ - \import {connectXFTP, pingXFTP, closeXFTP} from './dist/client.js';\ + \import {newXFTPAgent, closeXFTPAgent} from './dist/client.js';\ + \import {pingXFTP} from './dist/client.js';\ \await sodium.ready;\ \const server = Addr.parseXFTPServer('" <> addr <> "');\ - \const c = await connectXFTP(server);\ - \await pingXFTP(c);\ - \closeXFTP(c);" + \const agent = newXFTPAgent();\ + \await pingXFTP(agent, server);\ + \closeXFTPAgent(agent);" <> jsOut "new Uint8Array([1])" result `shouldBe` B.pack [1] @@ -2925,13 +2930,13 @@ fullRoundTripTest cfg caFile = do \import * as Addr from './dist/protocol/address.js';\ \import * as K from './dist/crypto/keys.js';\ \import {sha256} from './dist/crypto/digest.js';\ - \import {connectXFTP, createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk,\ - \ addXFTPRecipients, deleteXFTPChunk, closeXFTP} from './dist/client.js';\ + \import {newXFTPAgent, closeXFTPAgent, createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk,\ + \ addXFTPRecipients, deleteXFTPChunk} from './dist/client.js';\ \await sodium.ready;\ \const server = Addr.parseXFTPServer('" <> addr <> "');\ - \const c = await connectXFTP(server);\ + \const agent = newXFTPAgent();\ \const sndKp = K.generateEd25519KeyPair();\ \const rcvKp1 = K.generateEd25519KeyPair();\ \const rcvKp2 = K.generateEd25519KeyPair();\ @@ -2943,16 +2948,16 @@ fullRoundTripTest cfg caFile = do \ digest\ \};\ \const rcvKeys = [K.encodePubKeyEd25519(rcvKp1.publicKey)];\ - \const {senderId, recipientIds} = await createXFTPChunk(c, sndKp.privateKey, file, rcvKeys, null);\ - \await uploadXFTPChunk(c, sndKp.privateKey, senderId, chunkData);\ - \const dl1 = await downloadXFTPChunk(c, rcvKp1.privateKey, recipientIds[0], digest);\ + \const {senderId, recipientIds} = await createXFTPChunk(agent, server, sndKp.privateKey, file, rcvKeys, null);\ + \await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData);\ + \const dl1 = await downloadXFTPChunk(agent, server, rcvKp1.privateKey, recipientIds[0], digest);\ \const match1 = dl1.length === chunkData.length && dl1.every((b, i) => b === chunkData[i]);\ - \const newIds = await addXFTPRecipients(c, sndKp.privateKey, senderId,\ + \const newIds = await addXFTPRecipients(agent, server, sndKp.privateKey, senderId,\ \ [K.encodePubKeyEd25519(rcvKp2.publicKey)]);\ - \const dl2 = await downloadXFTPChunk(c, rcvKp2.privateKey, newIds[0], digest);\ + \const dl2 = await downloadXFTPChunk(agent, server, rcvKp2.privateKey, newIds[0], digest);\ \const match2 = dl2.length === chunkData.length && dl2.every((b, i) => b === chunkData[i]);\ - \await deleteXFTPChunk(c, sndKp.privateKey, senderId);\ - \closeXFTP(c);" + \await deleteXFTPChunk(agent, server, sndKp.privateKey, senderId);\ + \closeXFTPAgent(agent);" <> jsOut "new Uint8Array([match1 ? 1 : 0, match2 ? 1 : 0])" result `shouldBe` B.pack [1, 1] @@ -3012,7 +3017,7 @@ agentUploadDownloadTest cfg caFile = do \const agent = Agent.newXFTPAgent();\ \const originalData = new Uint8Array(crypto.randomBytes(50000));\ \const encrypted = Agent.encryptFileForUpload(originalData, 'test-file.bin');\ - \const {rcvDescription, sndDescription, uri} = await Agent.uploadFile(agent, server, encrypted);\ + \const {rcvDescription, sndDescription, uri} = await Agent.uploadFile(agent, [server], encrypted);\ \const fd = Agent.decodeDescriptionURI(uri);\ \const {header, content} = await Agent.downloadFile(agent, fd);\ \Agent.closeXFTPAgent(agent);\ @@ -3045,7 +3050,7 @@ agentDeleteTest cfg caFile = do \const agent = Agent.newXFTPAgent();\ \const originalData = new Uint8Array(crypto.randomBytes(50000));\ \const encrypted = Agent.encryptFileForUpload(originalData, 'del-test.bin');\ - \const {rcvDescription, sndDescription} = await Agent.uploadFile(agent, server, encrypted);\ + \const {rcvDescription, sndDescription} = await Agent.uploadFile(agent, [server], encrypted);\ \await Agent.deleteFile(agent, sndDescription);\ \let deleted = 0;\ \try {\ @@ -3077,7 +3082,7 @@ agentRedirectTest cfg caFile = do \const agent = Agent.newXFTPAgent();\ \const originalData = new Uint8Array(crypto.randomBytes(100000));\ \const encrypted = Agent.encryptFileForUpload(originalData, 'redirect-test.bin');\ - \const {rcvDescription, uri} = await Agent.uploadFile(agent, server, encrypted, {redirectThreshold: 50});\ + \const {rcvDescription, uri} = await Agent.uploadFile(agent, [server], encrypted, {redirectThreshold: 50});\ \const fd = Agent.decodeDescriptionURI(uri);\ \const hasRedirect = fd.redirect !== null ? 1 : 0;\ \const {header, content} = await Agent.downloadFile(agent, fd);\ @@ -3113,7 +3118,7 @@ tsUploadHaskellDownloadTest cfg caFile = do \const agent = Agent.newXFTPAgent();\ \const originalData = new Uint8Array(crypto.randomBytes(50000));\ \const encrypted = Agent.encryptFileForUpload(originalData, 'ts-to-hs.bin');\ - \const {rcvDescription} = await Agent.uploadFile(agent, server, encrypted);\ + \const {rcvDescription} = await Agent.uploadFile(agent, [server], encrypted);\ \Agent.closeXFTPAgent(agent);\ \const yaml = encodeFileDescription(rcvDescription);" <> jsOut2 "Buffer.from(yaml)" "Buffer.from(originalData)" @@ -3148,7 +3153,7 @@ tsUploadRedirectHaskellDownloadTest cfg caFile = do \const agent = Agent.newXFTPAgent();\ \const originalData = new Uint8Array(crypto.randomBytes(100000));\ \const encrypted = Agent.encryptFileForUpload(originalData, 'ts-redirect-to-hs.bin');\ - \const {rcvDescription} = await Agent.uploadFile(agent, server, encrypted, {redirectThreshold: 50});\ + \const {rcvDescription} = await Agent.uploadFile(agent, [server], encrypted, {redirectThreshold: 50});\ \Agent.closeXFTPAgent(agent);\ \const yaml = encodeFileDescription(rcvDescription);" <> jsOut2 "Buffer.from(yaml)" "Buffer.from(originalData)" diff --git a/xftp-web/src/agent.ts b/xftp-web/src/agent.ts index 421ec5345..c8c892234 100644 --- a/xftp-web/src/agent.ts +++ b/xftp-web/src/agent.ts @@ -104,10 +104,11 @@ export interface UploadOptions { export async function uploadFile( agent: XFTPClientAgent, - server: XFTPServer, + servers: XFTPServer[], encrypted: EncryptedFileMetadata, options?: UploadOptions ): Promise { + if (servers.length === 0) throw new Error("uploadFile: servers list is empty") const {onProgress, redirectThreshold, readChunk: readChunkOpt} = options ?? {} const readChunk: (offset: number, size: number) => Promise = readChunkOpt ? readChunkOpt @@ -116,41 +117,56 @@ export async function uploadFile( : () => { throw new Error("uploadFile: readChunk required when encData is absent") }) const total = encrypted.chunkSizes.reduce((a, b) => a + b, 0) const specs = prepareChunkSpecs(encrypted.chunkSizes) - const sentChunks: SentChunk[] = [] + + // Pre-assign servers and group by server for parallel upload + const chunkJobs = specs.map((spec, i) => ({ + index: i, + spec, + server: servers[Math.floor(Math.random() * servers.length)] + })) + const byServer = new Map() + for (const job of chunkJobs) { + const key = formatXFTPServer(job.server) + if (!byServer.has(key)) byServer.set(key, []) + byServer.get(key)!.push(job) + } + + // Upload groups in parallel, sequential within each group + const sentChunks: SentChunk[] = new Array(specs.length) let uploaded = 0 - for (let i = 0; i < specs.length; i++) { - const spec = specs[i] - const chunkNo = i + 1 - const sndKp = generateEd25519KeyPair() - const rcvKp = generateEd25519KeyPair() - const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize) - const chunkDigest = getChunkDigest(chunkData) - console.log(`[AGENT-DBG] upload chunk=${chunkNo} offset=${spec.chunkOffset} size=${spec.chunkSize} digest=${_dbgHex(chunkDigest, 32)} data[0..8]=${_dbgHex(chunkData)} data[-8..]=${_dbgHex(chunkData.slice(-8))}`) - const fileInfo: FileInfo = { - sndKey: encodePubKeyEd25519(sndKp.publicKey), - size: spec.chunkSize, - digest: chunkDigest + await Promise.all([...byServer.values()].map(async (jobs) => { + for (const {index, spec, server} of jobs) { + const chunkNo = index + 1 + const sndKp = generateEd25519KeyPair() + const rcvKp = generateEd25519KeyPair() + const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize) + const chunkDigest = getChunkDigest(chunkData) + const fileInfo: FileInfo = { + sndKey: encodePubKeyEd25519(sndKp.publicKey), + size: spec.chunkSize, + digest: chunkDigest + } + const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] + const {senderId, recipientIds} = await createXFTPChunk( + agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk + ) + await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) + sentChunks[index] = { + chunkNo, senderId, senderKey: sndKp.privateKey, + recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, + chunkSize: spec.chunkSize, digest: chunkDigest, server + } + uploaded += spec.chunkSize + onProgress?.(uploaded, total) } - const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] - const {senderId, recipientIds} = await createXFTPChunk( - agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk - ) - await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) - sentChunks.push({ - chunkNo, senderId, senderKey: sndKp.privateKey, - recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, - chunkSize: spec.chunkSize, digest: chunkDigest, server - }) - uploaded += spec.chunkSize - onProgress?.(uploaded, total) - } + })) const rcvDescription = buildDescription("recipient", encrypted, sentChunks) const sndDescription = buildDescription("sender", encrypted, sentChunks) let uri = encodeDescriptionURI(rcvDescription) let finalRcvDescription = rcvDescription const threshold = redirectThreshold ?? DEFAULT_REDIRECT_THRESHOLD if (uri.length > threshold && sentChunks.length > 1) { - finalRcvDescription = await uploadRedirectDescription(agent, server, rcvDescription) + finalRcvDescription = await uploadRedirectDescription(agent, servers, rcvDescription) uri = encodeDescriptionURI(finalRcvDescription) } return {rcvDescription: finalRcvDescription, sndDescription, uri} @@ -185,7 +201,7 @@ function buildDescription( async function uploadRedirectDescription( agent: XFTPClientAgent, - server: XFTPServer, + servers: XFTPServer[], innerFd: FileDescription ): Promise { const yaml = encodeFileDescription(innerFd) @@ -196,6 +212,7 @@ async function uploadRedirectDescription( for (let i = 0; i < specs.length; i++) { const spec = specs[i] const chunkNo = i + 1 + const server = servers[Math.floor(Math.random() * servers.length)] const sndKp = generateEd25519KeyPair() const rcvKp = generateEd25519KeyPair() const chunkData = enc.encData.subarray(spec.chunkOffset, spec.chunkOffset + spec.chunkSize) diff --git a/xftp-web/src/client.ts b/xftp-web/src/client.ts index 1384147fb..3d0429023 100644 --- a/xftp-web/src/client.ts +++ b/xftp-web/src/client.ts @@ -240,7 +240,7 @@ export function closeXFTPAgent(agent: XFTPClientAgent): void { // -- Connect + handshake -export async function connectXFTP(server: XFTPServer, config?: Partial): Promise { +async function connectXFTP(server: XFTPServer, config?: Partial): Promise { const cfg: TransportConfig = {...DEFAULT_TRANSPORT_CONFIG, ...config} const baseUrl = "https://" + server.host + ":" + server.port const transport = await createTransport(baseUrl, cfg) @@ -441,8 +441,3 @@ export async function pingXFTP(agent: XFTPClientAgent, server: XFTPServer): Prom if (response.type !== "FRPong") throw new Error("unexpected response: " + response.type) } -// -- Close - -export function closeXFTP(c: XFTPClient): void { - c.transport.close() -} diff --git a/xftp-web/src/crypto/digest.ts b/xftp-web/src/crypto/digest.ts index 7ef742c08..95bf2679c 100644 --- a/xftp-web/src/crypto/digest.ts +++ b/xftp-web/src/crypto/digest.ts @@ -16,7 +16,7 @@ export function sha512(data: Uint8Array): Uint8Array { // Internally segments chunks larger than 4MB to limit peak WASM memory usage. export function sha512Streaming(chunks: Iterable): Uint8Array { const SEG = 4 * 1024 * 1024 - const state = sodium.crypto_hash_sha512_init() + const state = sodium.crypto_hash_sha512_init() as unknown as sodium.StateAddress for (const chunk of chunks) { for (let off = 0; off < chunk.length; off += SEG) { sodium.crypto_hash_sha512_update(state, chunk.subarray(off, Math.min(off + SEG, chunk.length))) diff --git a/xftp-web/src/crypto/keys.ts b/xftp-web/src/crypto/keys.ts index 95664a788..21b217906 100644 --- a/xftp-web/src/crypto/keys.ts +++ b/xftp-web/src/crypto/keys.ts @@ -1,6 +1,7 @@ // Key generation, signing, DH -- Simplex.Messaging.Crypto (Ed25519/X25519/Ed448 functions). import sodium from "libsodium-wrappers-sumo" +await sodium.ready import {ed448} from "@noble/curves/ed448" import {sha256} from "./digest.js" import {concatBytes} from "../protocol/encoding.js" diff --git a/xftp-web/test/browser.test.ts b/xftp-web/test/browser.test.ts index 26a9670ca..0596c9b9a 100644 --- a/xftp-web/test/browser.test.ts +++ b/xftp-web/test/browser.test.ts @@ -10,7 +10,7 @@ test('browser upload + download round-trip', async () => { const data = new Uint8Array(50000) crypto.getRandomValues(data) const encrypted = encryptFileForUpload(data, 'test.bin') - const {rcvDescription} = await uploadFile(agent, server, encrypted) + const {rcvDescription} = await uploadFile(agent, [server], encrypted) const {content} = await downloadFile(agent, rcvDescription) expect(content).toEqual(data) } finally { diff --git a/xftp-web/vite.config.ts b/xftp-web/vite.config.ts index fd2cdd651..539ff18c1 100644 --- a/xftp-web/vite.config.ts +++ b/xftp-web/vite.config.ts @@ -85,11 +85,19 @@ export default defineConfig(({mode}) => { return { root: 'web', - build: {outDir: resolve(__dirname, 'dist-web'), target: 'esnext'}, + build: { + outDir: resolve(__dirname, 'dist-web'), + emptyOutDir: true, + target: 'esnext', + chunkSizeWarningLimit: 1200, + rollupOptions: { + external: ['node:http2', 'url'], + }, + }, server: httpsConfig ? {https: httpsConfig} : {}, preview: {host: true, https: false}, define, - worker: {format: 'es' as const}, + worker: {format: 'es' as const, rollupOptions: {external: ['node:http2', 'url']}}, plugins, } }) diff --git a/xftp-web/web/crypto-backend.ts b/xftp-web/web/crypto-backend.ts index eb4155128..e1ed52b84 100644 --- a/xftp-web/web/crypto-backend.ts +++ b/xftp-web/web/crypto-backend.ts @@ -28,10 +28,27 @@ class WorkerBackend implements CryptoBackend { private pending = new Map() private nextId = 1 private progressCb: ((done: number, total: number) => void) | null = null + private ready: Promise constructor() { this.worker = new Worker(new URL('./crypto.worker.ts', import.meta.url), {type: 'module'}) - this.worker.onmessage = (e) => this.handleMessage(e.data) + let rejectReady: (e: Error) => void + this.ready = new Promise((resolve, reject) => { + rejectReady = reject + this.worker.onmessage = (e) => { + if (e.data?.type === 'ready') { + this.worker.onmessage = (e) => this.handleMessage(e.data) + resolve() + } else { + reject(new Error('Worker: unexpected first message')) + } + } + }) + this.worker.onerror = (e) => { + rejectReady(new Error('Worker failed to load: ' + e.message)) + for (const p of this.pending.values()) p.reject(new Error('Worker error: ' + e.message)) + this.pending.clear() + } } private handleMessage(msg: {id: number, type: string, [k: string]: any}) { @@ -49,7 +66,8 @@ class WorkerBackend implements CryptoBackend { } } - private send(msg: Record, transfer?: Transferable[]): Promise { + private async send(msg: Record, transfer?: Transferable[]): Promise { + await this.ready const id = this.nextId++ return new Promise((resolve, reject) => { this.pending.set(id, {resolve, reject}) diff --git a/xftp-web/web/crypto.worker.ts b/xftp-web/web/crypto.worker.ts index b9bc7e705..d9f61d025 100644 --- a/xftp-web/web/crypto.worker.ts +++ b/xftp-web/web/crypto.worker.ts @@ -265,9 +265,9 @@ async function handleCleanup(id: number) { // ── Message dispatch ──────────────────────────────────────────── self.onmessage = async (e: MessageEvent) => { - await initPromise const msg = e.data try { + await initPromise switch (msg.type) { case 'encrypt': await handleEncrypt(msg.id, msg.data, msg.fileName) @@ -311,3 +311,6 @@ const initPromise = (async () => { await sodium.ready await sweepStale() })() + +// Signal main thread that the worker is ready to receive messages +initPromise.then(() => self.postMessage({type: 'ready'}), () => {}) diff --git a/xftp-web/web/servers.ts b/xftp-web/web/servers.ts index e15516382..b9a67e5bf 100644 --- a/xftp-web/web/servers.ts +++ b/xftp-web/web/servers.ts @@ -10,7 +10,3 @@ const serverAddresses: string[] = __XFTP_SERVERS__ export function getServers(): XFTPServer[] { return serverAddresses.map(parseXFTPServer) } - -export function pickRandomServer(servers: XFTPServer[]): XFTPServer { - return servers[Math.floor(Math.random() * servers.length)] -} diff --git a/xftp-web/web/upload.ts b/xftp-web/web/upload.ts index 12a473cf9..0faa70c0a 100644 --- a/xftp-web/web/upload.ts +++ b/xftp-web/web/upload.ts @@ -1,5 +1,5 @@ import {createCryptoBackend} from './crypto-backend.js' -import {getServers, pickRandomServer} from './servers.js' +import {getServers} from './servers.js' import {createProgressRing} from './progress.js' import { newXFTPAgent, closeXFTPAgent, uploadFile, encodeDescriptionURI, @@ -131,8 +131,7 @@ export function initUpload(app: HTMLElement) { chunkSizes: encrypted.chunkSizes } const servers = getServers() - const server = pickRandomServer(servers) - const result = await uploadFile(agent, server, metadata, { + const result = await uploadFile(agent, servers, metadata, { readChunk: (off, sz) => backend.readChunk(off, sz), onProgress: (uploaded, total) => { ring.update(0.3 + (uploaded / total) * 0.7)