From 4c8ceda358970d09e2d9130d22edf596c43ce46f Mon Sep 17 00:00:00 2001 From: masnwilliams <43387599+masnwilliams@users.noreply.github.com> Date: Mon, 4 May 2026 00:51:48 +0000 Subject: [PATCH 1/5] feat: subscribe to managed auth state via SSE instead of polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace 2s polling against /auth/connections/{id} with the existing /auth/connections/{id}/events SSE endpoint. The server only emits on status/step change, so submissions no longer race against a stale poll snapping the UI back to awaiting_input. Adds streamManagedAuthEvents to the api client (fetch + ReadableStream SSE reader so the Authorization header is honored). The hook keeps the initial GET to populate id/domain/profile_name, then merges each state event into the response and re-derives the UI state. Drops the post-submit polling delay hack — the optimistic submitting UI sticks until the server emits a real change. Co-Authored-By: Claude Opus 4.7 --- .changeset/sse-auth-events.md | 5 + packages/managed-auth-react/src/lib/api.ts | 132 +++++++++- .../src/session/useManagedAuthSession.ts | 248 +++++++++++------- 3 files changed, 284 insertions(+), 101 deletions(-) create mode 100644 .changeset/sse-auth-events.md diff --git a/.changeset/sse-auth-events.md b/.changeset/sse-auth-events.md new file mode 100644 index 0000000..49a1f60 --- /dev/null +++ b/.changeset/sse-auth-events.md @@ -0,0 +1,5 @@ +--- +"@onkernel/managed-auth-react": minor +--- + +Subscribe to managed auth state via the `/auth/connections/{id}/events` SSE endpoint instead of polling `/auth/connections/{id}` every 2s. Removes the post-submit race where the UI could briefly snap back to `awaiting_input` after submission. diff --git a/packages/managed-auth-react/src/lib/api.ts b/packages/managed-auth-react/src/lib/api.ts index 3f12960..9431eaa 100644 --- a/packages/managed-auth-react/src/lib/api.ts +++ b/packages/managed-auth-react/src/lib/api.ts @@ -1,4 +1,13 @@ -import type { ManagedAuthResponse, MFAType } from "./types"; +import type { + DiscoveredField, + FlowStatus, + FlowStep, + ManagedAuthResponse, + MFAOption, + MFAType, + SSOButton, + SignInOption, +} from "./types"; export interface ApiClientOptions { baseUrl?: string; @@ -156,3 +165,124 @@ export function submitSignInOption( options, ); } + +export interface ManagedAuthStateEventData { + event: "managed_auth_state"; + timestamp: string; + flow_status: FlowStatus; + flow_step: FlowStep; + flow_type?: "LOGIN" | "REAUTH"; + discovered_fields?: DiscoveredField[]; + mfa_options?: MFAOption[]; + sign_in_options?: SignInOption[]; + pending_sso_buttons?: SSOButton[]; + external_action_message?: string; + website_error?: string; + error_message?: string; + error_code?: string; + post_login_url?: string; + live_view_url?: string; + hosted_url?: string; +} + +export interface ManagedAuthStreamHandlers { + onState: (event: ManagedAuthStateEventData) => void; + onError: (err: ManagedAuthApiError) => void; + onClose: () => void; +} + +interface ParsedSSEMessage { + event?: string; + data: string; +} + +function parseSSEMessage(raw: string): ParsedSSEMessage | null { + if (!raw.trim()) return null; + let event: string | undefined; + const dataLines: string[] = []; + for (const line of raw.split("\n")) { + if (!line || line.startsWith(":")) continue; + const colonIdx = line.indexOf(":"); + const field = colonIdx === -1 ? line : line.slice(0, colonIdx); + let value = colonIdx === -1 ? "" : line.slice(colonIdx + 1); + if (value.startsWith(" ")) value = value.slice(1); + if (field === "event") event = value; + else if (field === "data") dataLines.push(value); + } + if (dataLines.length === 0) return null; + return { event, data: dataLines.join("\n") }; +} + +export function streamManagedAuthEvents( + id: string, + jwt: string, + handlers: ManagedAuthStreamHandlers, + options?: ApiClientOptions, +): () => void { + const ac = new AbortController(); + void (async () => { + try { + const f = getFetch(options); + const res = await f( + `${getBaseUrl(options)}/auth/connections/${id}/events`, + { + method: "GET", + headers: { + Authorization: `Bearer ${jwt}`, + Accept: "text/event-stream", + }, + signal: ac.signal, + }, + ); + if (!res.ok) { + const msg = await parseError(res); + handlers.onError(new ManagedAuthApiError(msg, res.status, msg)); + return; + } + if (!res.body) { + handlers.onError( + new ManagedAuthApiError("SSE response has no body", 500, ""), + ); + return; + } + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buf = ""; + for (;;) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + let sepIdx: number; + while ((sepIdx = buf.indexOf("\n\n")) !== -1) { + const raw = buf.slice(0, sepIdx); + buf = buf.slice(sepIdx + 2); + const msg = parseSSEMessage(raw); + if (!msg) continue; + if (msg.event === "managed_auth_state") { + try { + handlers.onState(JSON.parse(msg.data) as ManagedAuthStateEventData); + } catch { + /* ignore malformed payload */ + } + } else if (msg.event === "error") { + try { + const data = JSON.parse(msg.data) as { + error?: { code?: string; message?: string }; + }; + const message = data.error?.message ?? "Stream error"; + handlers.onError(new ManagedAuthApiError(message, 500, message)); + } catch { + /* ignore malformed payload */ + } + } + } + } + handlers.onClose(); + } catch (err) { + if ((err as { name?: string })?.name === "AbortError") return; + const message = err instanceof Error ? err.message : "Stream failed"; + handlers.onError(new ManagedAuthApiError(message, 0, message)); + } + })(); + return () => ac.abort(); +} diff --git a/packages/managed-auth-react/src/session/useManagedAuthSession.ts b/packages/managed-auth-react/src/session/useManagedAuthSession.ts index 515db26..1239272 100644 --- a/packages/managed-auth-react/src/session/useManagedAuthSession.ts +++ b/packages/managed-auth-react/src/session/useManagedAuthSession.ts @@ -1,13 +1,14 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { exchangeHandoffCode, - ManagedAuthApiError, retrieveManagedAuth, + streamManagedAuthEvents, submitFieldValues, submitMFASelection, submitSignInOption, submitSSOButton, type ApiClientOptions, + type ManagedAuthStateEventData, } from "../lib/api"; import type { AuthErrorPayload, @@ -18,8 +19,8 @@ import type { UIState, } from "../lib/types"; -const POLL_INTERVAL_MS = 2000; -const POST_SUBMIT_DELAY_MS = 2000; +const RECONNECT_BASE_MS = 1000; +const RECONNECT_MAX_MS = 15000; function deriveUIState(state: ManagedAuthResponse): UIState { if (state.flow_status === "FAILED" || state.flow_status === "CANCELED") { @@ -42,6 +43,29 @@ function deriveUIState(state: ManagedAuthResponse): UIState { } } +function isTerminal(uiState: UIState): boolean { + return uiState === "success" || uiState === "expired" || uiState === "error"; +} + +function mergeStateEvent( + base: ManagedAuthResponse, + ev: ManagedAuthStateEventData, +): ManagedAuthResponse { + return { + ...base, + flow_status: ev.flow_status, + flow_step: ev.flow_step, + discovered_fields: ev.discovered_fields ?? null, + pending_sso_buttons: ev.pending_sso_buttons ?? null, + mfa_options: ev.mfa_options ?? null, + sign_in_options: ev.sign_in_options ?? null, + external_action_message: ev.external_action_message ?? null, + website_error: ev.website_error ?? null, + error_message: ev.error_message ?? null, + error_code: ev.error_code ?? null, + }; +} + export interface ManagedAuthSessionOptions extends ApiClientOptions { sessionId: string; handoffCode: string; @@ -66,7 +90,7 @@ export interface ManagedAuthSessionValue { /** * Internal hook that owns the full state machine for a managed auth session — - * handoff code exchange, polling, submissions, UI-state derivation. + * handoff code exchange, SSE subscription, submissions, UI-state derivation. */ export function useManagedAuthSession( options: ManagedAuthSessionOptions, @@ -80,94 +104,132 @@ export function useManagedAuthSession( const [submitError, setSubmitError] = useState(null); const [initError, setInitError] = useState(null); - const pollRef = useRef | null>(null); - const pollDelayRef = useRef | null>(null); + const stateRef = useRef(null); + const disconnectRef = useRef<(() => void) | null>(null); + const reconnectTimerRef = useRef | null>(null); + const reconnectAttemptsRef = useRef(0); + const terminalRef = useRef(false); const callbackFiredRef = useRef<{ success: boolean; error: boolean }>({ success: false, error: false, }); - const stopPolling = useCallback(() => { - if (pollDelayRef.current) { - clearTimeout(pollDelayRef.current); - pollDelayRef.current = null; + const fireSuccessOnce = useCallback( + (payload: AuthSuccessPayload) => { + if (callbackFiredRef.current.success) return; + callbackFiredRef.current.success = true; + onSuccess?.(payload); + }, + [onSuccess], + ); + + const fireErrorOnce = useCallback( + (payload: AuthErrorPayload) => { + if (callbackFiredRef.current.error) return; + callbackFiredRef.current.error = true; + onError?.(payload); + }, + [onError], + ); + + const disconnectStream = useCallback(() => { + if (reconnectTimerRef.current) { + clearTimeout(reconnectTimerRef.current); + reconnectTimerRef.current = null; } - if (pollRef.current) { - clearInterval(pollRef.current); - pollRef.current = null; + if (disconnectRef.current) { + disconnectRef.current(); + disconnectRef.current = null; } }, []); - const pollOnce = useCallback( - async (tokenOverride?: string) => { - const token = tokenOverride ?? jwt; - if (!token) return; - try { - const newState = await retrieveManagedAuth(sessionId, token, options); - setState(newState); - setSubmitError(null); + const connectStream = useCallback( + (token: string) => { + if (terminalRef.current) return; + if (disconnectRef.current) return; - const nextUI = deriveUIState(newState); + const handleStateEvent = (ev: ManagedAuthStateEventData) => { + reconnectAttemptsRef.current = 0; + setSubmitError(null); + const base = stateRef.current; + if (!base) return; + const merged = mergeStateEvent(base, ev); + stateRef.current = merged; + setState(merged); + const nextUI = deriveUIState(merged); setUIState(nextUI); - if (nextUI === "success") { - if (!callbackFiredRef.current.success) { - callbackFiredRef.current.success = true; - onSuccess?.({ - profileName: newState.profile_name, - domain: newState.domain, - }); - } - stopPolling(); + terminalRef.current = true; + fireSuccessOnce({ + profileName: merged.profile_name, + domain: merged.domain, + }); + disconnectStream(); } else if (nextUI === "error" || nextUI === "expired") { - if (!callbackFiredRef.current.error) { - callbackFiredRef.current.error = true; - onError?.({ - code: newState.error_code ?? undefined, - message: - newState.error_message || - newState.website_error || - (nextUI === "expired" ? "Session expired" : "Login failed"), - }); - } - stopPolling(); + terminalRef.current = true; + fireErrorOnce({ + code: merged.error_code ?? undefined, + message: + merged.error_message || + merged.website_error || + (nextUI === "expired" ? "Session expired" : "Login failed"), + }); + disconnectStream(); } - } catch (err) { - const apiErr = err as ManagedAuthApiError; - if (apiErr?.status === 401 || apiErr?.status === 410) { - stopPolling(); - setUIState("expired"); - if (!callbackFiredRef.current.error) { - callbackFiredRef.current.error = true; - onError?.({ message: "Session expired" }); - } - } - } - }, - [jwt, onError, onSuccess, options, sessionId, stopPolling], - ); + }; - const startPolling = useCallback( - (immediate = true, delayMs = 0, tokenOverride?: string) => { - if (pollRef.current) return; - const begin = () => { - if (pollRef.current) return; - pollRef.current = setInterval(() => { - void pollOnce(tokenOverride); - }, POLL_INTERVAL_MS); - if (immediate) void pollOnce(tokenOverride); + const scheduleReconnect = () => { + if (terminalRef.current) return; + const attempt = reconnectAttemptsRef.current++; + const delay = Math.min( + RECONNECT_BASE_MS * Math.pow(2, attempt), + RECONNECT_MAX_MS, + ); + reconnectTimerRef.current = setTimeout(() => { + reconnectTimerRef.current = null; + connectStream(token); + }, delay); }; - if (delayMs > 0) { - pollDelayRef.current = setTimeout(begin, delayMs); - } else { - begin(); - } + + disconnectRef.current = streamManagedAuthEvents( + sessionId, + token, + { + onState: handleStateEvent, + onError: (err) => { + disconnectRef.current = null; + if (err.status === 401 || err.status === 410) { + terminalRef.current = true; + setUIState("expired"); + fireErrorOnce({ message: "Session expired" }); + return; + } + scheduleReconnect(); + }, + onClose: () => { + disconnectRef.current = null; + if (terminalRef.current) return; + scheduleReconnect(); + }, + }, + options, + ); }, - [pollOnce], + [ + disconnectStream, + fireErrorOnce, + fireSuccessOnce, + options, + sessionId, + ], ); useEffect(() => { let cancelled = false; + terminalRef.current = false; + reconnectAttemptsRef.current = 0; + callbackFiredRef.current = { success: false, error: false }; + (async () => { try { const token = await exchangeHandoffCode( @@ -179,26 +241,19 @@ export function useManagedAuthSession( setJwt(token); const initial = await retrieveManagedAuth(sessionId, token, options); if (cancelled) return; + stateRef.current = initial; setState(initial); const derived = deriveUIState(initial); - if ( - derived === "success" || - derived === "expired" || - derived === "error" - ) { + if (isTerminal(derived)) { + terminalRef.current = true; setUIState(derived); - if (derived === "success" && !callbackFiredRef.current.success) { - callbackFiredRef.current.success = true; - onSuccess?.({ + if (derived === "success") { + fireSuccessOnce({ profileName: initial.profile_name, domain: initial.domain, }); - } else if ( - (derived === "error" || derived === "expired") && - !callbackFiredRef.current.error - ) { - callbackFiredRef.current.error = true; - onError?.({ + } else { + fireErrorOnce({ code: initial.error_code ?? undefined, message: initial.error_message || @@ -208,7 +263,7 @@ export function useManagedAuthSession( } } else if (autoStart) { setUIState("discovering"); - startPolling(true, 0, token); + connectStream(token); } else { setUIState("prime"); } @@ -218,15 +273,13 @@ export function useManagedAuthSession( err instanceof Error ? err.message : "Failed to start session"; setInitError(message); setUIState("error"); - if (!callbackFiredRef.current.error) { - callbackFiredRef.current.error = true; - onError?.({ message }); - } + terminalRef.current = true; + fireErrorOnce({ message }); } })(); return () => { cancelled = true; - stopPolling(); + disconnectStream(); }; // eslint-disable-next-line react-hooks/exhaustive-deps }, [sessionId, handoffCode]); @@ -234,8 +287,8 @@ export function useManagedAuthSession( const startFlow = useCallback(() => { if (!jwt) return; setUIState("discovering"); - startPolling(true, 0); - }, [jwt, startPolling]); + connectStream(jwt); + }, [jwt, connectStream]); const submit = useCallback( async (fn: () => Promise, onFail: string) => { @@ -243,24 +296,19 @@ export function useManagedAuthSession( setIsSubmitting(true); setSubmitError(null); setUIState("submitting"); - stopPolling(); try { await fn(); - startPolling(false, POST_SUBMIT_DELAY_MS); } catch (err) { const msg = err instanceof Error ? err.message : onFail; setSubmitError(msg); setUIState((current) => - current === "success" || current === "expired" || current === "error" - ? current - : "awaiting_input", + isTerminal(current) ? current : "awaiting_input", ); - startPolling(); } finally { setIsSubmitting(false); } }, - [jwt, startPolling, stopPolling], + [jwt], ); const submitFields = useCallback( From b649e9e49619c481afda0d713e68ff16e11a1139 Mon Sep 17 00:00:00 2001 From: masnwilliams <43387599+masnwilliams@users.noreply.github.com> Date: Mon, 4 May 2026 00:55:37 +0000 Subject: [PATCH 2/5] chore: prettier format Co-Authored-By: Claude Opus 4.7 --- packages/managed-auth-react/src/lib/api.ts | 4 +++- .../src/session/useManagedAuthSession.ts | 8 +------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/managed-auth-react/src/lib/api.ts b/packages/managed-auth-react/src/lib/api.ts index 9431eaa..8404df2 100644 --- a/packages/managed-auth-react/src/lib/api.ts +++ b/packages/managed-auth-react/src/lib/api.ts @@ -260,7 +260,9 @@ export function streamManagedAuthEvents( if (!msg) continue; if (msg.event === "managed_auth_state") { try { - handlers.onState(JSON.parse(msg.data) as ManagedAuthStateEventData); + handlers.onState( + JSON.parse(msg.data) as ManagedAuthStateEventData, + ); } catch { /* ignore malformed payload */ } diff --git a/packages/managed-auth-react/src/session/useManagedAuthSession.ts b/packages/managed-auth-react/src/session/useManagedAuthSession.ts index 1239272..8c5bd50 100644 --- a/packages/managed-auth-react/src/session/useManagedAuthSession.ts +++ b/packages/managed-auth-react/src/session/useManagedAuthSession.ts @@ -215,13 +215,7 @@ export function useManagedAuthSession( options, ); }, - [ - disconnectStream, - fireErrorOnce, - fireSuccessOnce, - options, - sessionId, - ], + [disconnectStream, fireErrorOnce, fireSuccessOnce, options, sessionId], ); useEffect(() => { From 47ad3a0c1fddb837716f2c2f6afe129037f6922c Mon Sep 17 00:00:00 2001 From: masnwilliams <43387599+masnwilliams@users.noreply.github.com> Date: Mon, 4 May 2026 16:13:32 +0000 Subject: [PATCH 3/5] fix: terminate SSE read loop after server-emitted error event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous error-event branch fired handlers.onError but kept reading from the stream. The hook treats onError as terminal — it nulls the disconnect ref and schedules a reconnect — so the original stream was left running alongside the new one, both feeding state events into the hook and racing on disconnectRef on close. Abort the controller and return after dispatching the error so the stream contract matches the hook's expectations: onError fires at most once and is mutually exclusive with onClose. Co-Authored-By: Claude Opus 4.7 --- packages/managed-auth-react/src/lib/api.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/managed-auth-react/src/lib/api.ts b/packages/managed-auth-react/src/lib/api.ts index 8404df2..1d17d40 100644 --- a/packages/managed-auth-react/src/lib/api.ts +++ b/packages/managed-auth-react/src/lib/api.ts @@ -267,15 +267,18 @@ export function streamManagedAuthEvents( /* ignore malformed payload */ } } else if (msg.event === "error") { + let message = "Stream error"; try { const data = JSON.parse(msg.data) as { error?: { code?: string; message?: string }; }; - const message = data.error?.message ?? "Stream error"; - handlers.onError(new ManagedAuthApiError(message, 500, message)); + if (data.error?.message) message = data.error.message; } catch { - /* ignore malformed payload */ + /* fall through with default message */ } + handlers.onError(new ManagedAuthApiError(message, 500, message)); + ac.abort(); + return; } } } From 845769bccf4ba79e7bcd8b066adeb927c3eb1bcf Mon Sep 17 00:00:00 2001 From: masnwilliams <43387599+masnwilliams@users.noreply.github.com> Date: Tue, 5 May 2026 23:13:13 +0000 Subject: [PATCH 4/5] refactor: colocate ManagedAuthStateEventData with other vendored types Group all SDK-mirrored protocol types in lib/types.ts so the drift surface against @onkernel/sdk is audited in one place. api.ts re-exports the type to keep the existing import path stable. Co-Authored-By: Claude Opus 4.7 --- packages/managed-auth-react/src/lib/api.ts | 28 +++----------------- packages/managed-auth-react/src/lib/types.ts | 20 ++++++++++++++ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/packages/managed-auth-react/src/lib/api.ts b/packages/managed-auth-react/src/lib/api.ts index 1d17d40..3dac0b9 100644 --- a/packages/managed-auth-react/src/lib/api.ts +++ b/packages/managed-auth-react/src/lib/api.ts @@ -1,14 +1,11 @@ import type { - DiscoveredField, - FlowStatus, - FlowStep, ManagedAuthResponse, - MFAOption, + ManagedAuthStateEventData, MFAType, - SSOButton, - SignInOption, } from "./types"; +export type { ManagedAuthStateEventData }; + export interface ApiClientOptions { baseUrl?: string; fetch?: typeof fetch; @@ -166,25 +163,6 @@ export function submitSignInOption( ); } -export interface ManagedAuthStateEventData { - event: "managed_auth_state"; - timestamp: string; - flow_status: FlowStatus; - flow_step: FlowStep; - flow_type?: "LOGIN" | "REAUTH"; - discovered_fields?: DiscoveredField[]; - mfa_options?: MFAOption[]; - sign_in_options?: SignInOption[]; - pending_sso_buttons?: SSOButton[]; - external_action_message?: string; - website_error?: string; - error_message?: string; - error_code?: string; - post_login_url?: string; - live_view_url?: string; - hosted_url?: string; -} - export interface ManagedAuthStreamHandlers { onState: (event: ManagedAuthStateEventData) => void; onError: (err: ManagedAuthApiError) => void; diff --git a/packages/managed-auth-react/src/lib/types.ts b/packages/managed-auth-react/src/lib/types.ts index fbce640..3caf3ef 100644 --- a/packages/managed-auth-react/src/lib/types.ts +++ b/packages/managed-auth-react/src/lib/types.ts @@ -73,6 +73,26 @@ export interface ManagedAuthResponse { error_code?: string | null; } +// Mirrors @onkernel/sdk's ConnectionFollowResponse.ManagedAuthStateEvent. +export interface ManagedAuthStateEventData { + event: "managed_auth_state"; + timestamp: string; + flow_status: FlowStatus; + flow_step: FlowStep; + flow_type?: "LOGIN" | "REAUTH"; + discovered_fields?: DiscoveredField[]; + mfa_options?: MFAOption[]; + sign_in_options?: SignInOption[]; + pending_sso_buttons?: SSOButton[]; + external_action_message?: string; + website_error?: string; + error_message?: string; + error_code?: string; + post_login_url?: string; + live_view_url?: string; + hosted_url?: string; +} + export type UIState = | "prime" | "discovering" From c20d90b5aabd305d8db098bac7efd29a99632c16 Mon Sep 17 00:00:00 2001 From: masnwilliams <43387599+masnwilliams@users.noreply.github.com> Date: Tue, 5 May 2026 23:39:36 +0000 Subject: [PATCH 5/5] fix: harden SSE stream handling for reconnect, fatal errors, CRLF - ManagedAuthApiError gains a fatal flag. Server-emitted SSE error events now mark themselves fatal; the hook treats fatal errors as terminal instead of looping reconnect forever. - On reconnect after a stream drop, refetch the current session via GET /auth/connections/{id} before resubscribing, so we don't miss state changes that happened during the disconnect window. - SSE message-separator + line-split now handle \r\n and \r in addition to \n, per the spec. Co-Authored-By: Claude Opus 4.7 --- packages/managed-auth-react/src/lib/api.ts | 22 +++++--- .../src/session/useManagedAuthSession.ts | 51 ++++++++++++++++++- 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/packages/managed-auth-react/src/lib/api.ts b/packages/managed-auth-react/src/lib/api.ts index 3dac0b9..fba39b6 100644 --- a/packages/managed-auth-react/src/lib/api.ts +++ b/packages/managed-auth-react/src/lib/api.ts @@ -16,11 +16,13 @@ const DEFAULT_BASE_URL = "https://api.onkernel.com"; export class ManagedAuthApiError extends Error { public readonly status: number; public readonly body: string; - constructor(message: string, status: number, body: string) { + public readonly fatal: boolean; + constructor(message: string, status: number, body: string, fatal = false) { super(message); this.name = "ManagedAuthApiError"; this.status = status; this.body = body; + this.fatal = fatal; } } @@ -178,7 +180,7 @@ function parseSSEMessage(raw: string): ParsedSSEMessage | null { if (!raw.trim()) return null; let event: string | undefined; const dataLines: string[] = []; - for (const line of raw.split("\n")) { + for (const line of raw.split(/\r\n|\r|\n/)) { if (!line || line.startsWith(":")) continue; const colonIdx = line.indexOf(":"); const field = colonIdx === -1 ? line : line.slice(0, colonIdx); @@ -226,14 +228,18 @@ export function streamManagedAuthEvents( const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = ""; + // SSE message separator: blank line. Per spec, line endings can be + // \n, \r\n, or \r — so the separator can be \n\n, \r\n\r\n, or \r\r. + const SEPARATOR_RE = /\r\n\r\n|\r\r|\n\n/; for (;;) { const { value, done } = await reader.read(); if (done) break; buf += decoder.decode(value, { stream: true }); - let sepIdx: number; - while ((sepIdx = buf.indexOf("\n\n")) !== -1) { - const raw = buf.slice(0, sepIdx); - buf = buf.slice(sepIdx + 2); + for (;;) { + const match = SEPARATOR_RE.exec(buf); + if (!match) break; + const raw = buf.slice(0, match.index); + buf = buf.slice(match.index + match[0].length); const msg = parseSSEMessage(raw); if (!msg) continue; if (msg.event === "managed_auth_state") { @@ -254,7 +260,9 @@ export function streamManagedAuthEvents( } catch { /* fall through with default message */ } - handlers.onError(new ManagedAuthApiError(message, 500, message)); + handlers.onError( + new ManagedAuthApiError(message, 500, message, true), + ); ac.abort(); return; } diff --git a/packages/managed-auth-react/src/session/useManagedAuthSession.ts b/packages/managed-auth-react/src/session/useManagedAuthSession.ts index 8c5bd50..6cceb57 100644 --- a/packages/managed-auth-react/src/session/useManagedAuthSession.ts +++ b/packages/managed-auth-react/src/session/useManagedAuthSession.ts @@ -187,10 +187,53 @@ export function useManagedAuthSession( ); reconnectTimerRef.current = setTimeout(() => { reconnectTimerRef.current = null; - connectStream(token); + void resyncAndConnect(token); }, delay); }; + // SSE only emits future deltas. After a drop, resync via GET so we don't + // miss state changes that happened during the disconnect window before + // resubscribing to the stream. + const resyncAndConnect = async (t: string) => { + if (terminalRef.current) return; + try { + const fresh = await retrieveManagedAuth(sessionId, t, options); + if (terminalRef.current) return; + stateRef.current = fresh; + setState(fresh); + const derived = deriveUIState(fresh); + setUIState(derived); + if (isTerminal(derived)) { + terminalRef.current = true; + if (derived === "success") { + fireSuccessOnce({ + profileName: fresh.profile_name, + domain: fresh.domain, + }); + } else { + fireErrorOnce({ + code: fresh.error_code ?? undefined, + message: + fresh.error_message || + fresh.website_error || + (derived === "expired" ? "Session expired" : "Login failed"), + }); + } + return; + } + connectStream(t); + } catch (err) { + const status = (err as { status?: number })?.status; + if (status === 401 || status === 410) { + terminalRef.current = true; + setUIState("expired"); + fireErrorOnce({ message: "Session expired" }); + return; + } + scheduleReconnect(); + } + }; + disconnectRef.current = streamManagedAuthEvents( sessionId, token, @@ -204,6 +247,12 @@ export function useManagedAuthSession( fireErrorOnce({ message: "Session expired" }); return; } + if (err.fatal) { + terminalRef.current = true; + setUIState("error"); + fireErrorOnce({ message: err.message }); + return; + } scheduleReconnect(); }, onClose: () => {