Skip to content

Commit bad78cc

Browse files
improvement(sockets): workflow switching state machine (#4104)
* improvement(sockets): workflow switching state machine * address comments
1 parent 8bbca9b commit bad78cc

File tree

13 files changed

+1298
-142
lines changed

13 files changed

+1298
-142
lines changed

apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -59,40 +59,61 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
5959
const hasOperationError = useOperationQueueStore((state) => state.hasOperationError)
6060
const addNotification = useNotificationStore((state) => state.addNotification)
6161
const removeNotification = useNotificationStore((state) => state.removeNotification)
62-
const { isReconnecting } = useSocket()
63-
const reconnectingNotificationIdRef = useRef<string | null>(null)
62+
const { isReconnecting, isRetryingWorkflowJoin } = useSocket()
63+
const realtimeStatusNotificationIdRef = useRef<string | null>(null)
64+
const realtimeStatusNotificationMessageRef = useRef<string | null>(null)
6465

6566
const isOfflineMode = hasOperationError
67+
const realtimeStatusMessage = isReconnecting
68+
? 'Reconnecting...'
69+
: isRetryingWorkflowJoin
70+
? 'Joining workflow...'
71+
: null
72+
73+
const clearRealtimeStatusNotification = useCallback(() => {
74+
if (!realtimeStatusNotificationIdRef.current) {
75+
return
76+
}
77+
78+
removeNotification(realtimeStatusNotificationIdRef.current)
79+
realtimeStatusNotificationIdRef.current = null
80+
realtimeStatusNotificationMessageRef.current = null
81+
}, [removeNotification])
6682

6783
useEffect(() => {
68-
if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) {
69-
const id = addNotification({
70-
level: 'error',
71-
message: 'Reconnecting...',
72-
})
73-
reconnectingNotificationIdRef.current = id
74-
} else if (!isReconnecting && reconnectingNotificationIdRef.current) {
75-
removeNotification(reconnectingNotificationIdRef.current)
76-
reconnectingNotificationIdRef.current = null
84+
if (isOfflineMode || !realtimeStatusMessage) {
85+
clearRealtimeStatusNotification()
86+
return
7787
}
7888

79-
return () => {
80-
if (reconnectingNotificationIdRef.current) {
81-
removeNotification(reconnectingNotificationIdRef.current)
82-
reconnectingNotificationIdRef.current = null
83-
}
89+
if (
90+
realtimeStatusNotificationIdRef.current &&
91+
realtimeStatusNotificationMessageRef.current === realtimeStatusMessage
92+
) {
93+
return
8494
}
85-
}, [isReconnecting, isOfflineMode, addNotification, removeNotification])
95+
96+
clearRealtimeStatusNotification()
97+
98+
const id = addNotification({
99+
level: 'error',
100+
message: realtimeStatusMessage,
101+
})
102+
103+
realtimeStatusNotificationIdRef.current = id
104+
realtimeStatusNotificationMessageRef.current = realtimeStatusMessage
105+
}, [addNotification, clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage])
106+
107+
useEffect(() => {
108+
return clearRealtimeStatusNotification
109+
}, [clearRealtimeStatusNotification])
86110

87111
useEffect(() => {
88112
if (!isOfflineMode || hasShownOfflineNotification) {
89113
return
90114
}
91115

92-
if (reconnectingNotificationIdRef.current) {
93-
removeNotification(reconnectingNotificationIdRef.current)
94-
reconnectingNotificationIdRef.current = null
95-
}
116+
clearRealtimeStatusNotification()
96117

97118
try {
98119
addNotification({
@@ -107,7 +128,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
107128
} catch (error) {
108129
logger.error('Failed to add offline notification', { error })
109130
}
110-
}, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode])
131+
}, [addNotification, clearRealtimeStatusNotification, hasShownOfflineNotification, isOfflineMode])
111132

112133
const {
113134
data: workspacePermissions,

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { useViewport } from 'reactflow'
55
import { getUserColor } from '@/lib/workspaces/colors'
66
import { usePreventZoom } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
77
import { useSocket } from '@/app/workspace/providers/socket-provider'
8+
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
89

910
interface CursorPoint {
1011
x: number
@@ -19,11 +20,16 @@ interface CursorRenderData {
1920
}
2021

2122
const CursorsComponent = () => {
22-
const { presenceUsers, currentSocketId } = useSocket()
23+
const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId)
24+
const { currentWorkflowId, presenceUsers, currentSocketId } = useSocket()
2325
const viewport = useViewport()
2426
const preventZoomRef = usePreventZoom()
2527

2628
const cursors = useMemo<CursorRenderData[]>(() => {
29+
if (!activeWorkflowId || currentWorkflowId !== activeWorkflowId) {
30+
return []
31+
}
32+
2733
return presenceUsers
2834
.filter((user): user is typeof user & { cursor: CursorPoint } => Boolean(user.cursor))
2935
.filter((user) => user.socketId !== currentSocketId)
@@ -33,7 +39,7 @@ const CursorsComponent = () => {
3339
cursor: user.cursor,
3440
color: getUserColor(user.userId),
3541
}))
36-
}, [currentSocketId, presenceUsers])
42+
}, [activeWorkflowId, currentSocketId, currentWorkflowId, presenceUsers])
3743

3844
if (!cursors.length) {
3945
return null
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
import { describe, expect, it } from 'vitest'
2+
import {
3+
SOCKET_JOIN_RETRY_BASE_DELAY_MS,
4+
SOCKET_JOIN_RETRY_MAX_DELAY_MS,
5+
SocketJoinController,
6+
} from '@/app/workspace/providers/socket-join-controller'
7+
8+
describe('SocketJoinController', () => {
9+
it('blocks rejoining a deleted workflow until the desired workflow changes', () => {
10+
const controller = new SocketJoinController()
11+
12+
expect(controller.setConnected(true)).toEqual([])
13+
expect(controller.requestWorkflow('workflow-a')).toEqual([
14+
{ type: 'join', workflowId: 'workflow-a' },
15+
])
16+
expect(controller.handleJoinSuccess('workflow-a')).toMatchObject({
17+
apply: true,
18+
ignored: false,
19+
commands: [],
20+
workflowId: 'workflow-a',
21+
})
22+
23+
expect(controller.handleWorkflowDeleted('workflow-a')).toEqual({
24+
shouldClearCurrent: true,
25+
commands: [],
26+
})
27+
expect(controller.requestWorkflow('workflow-a')).toEqual([])
28+
expect(controller.requestWorkflow('workflow-b')).toEqual([
29+
{ type: 'join', workflowId: 'workflow-b' },
30+
])
31+
})
32+
33+
it('joins only the latest desired workflow after rapid A to B to C switching', () => {
34+
const controller = new SocketJoinController()
35+
36+
controller.setConnected(true)
37+
controller.requestWorkflow('workflow-a')
38+
controller.handleJoinSuccess('workflow-a')
39+
40+
expect(controller.requestWorkflow('workflow-b')).toEqual([
41+
{ type: 'join', workflowId: 'workflow-b' },
42+
])
43+
expect(controller.requestWorkflow('workflow-c')).toEqual([])
44+
45+
expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({
46+
apply: false,
47+
ignored: true,
48+
workflowId: 'workflow-b',
49+
commands: [{ type: 'join', workflowId: 'workflow-c' }],
50+
})
51+
expect(controller.handleJoinSuccess('workflow-c')).toMatchObject({
52+
apply: true,
53+
ignored: false,
54+
workflowId: 'workflow-c',
55+
commands: [],
56+
})
57+
})
58+
59+
it('rejoins the original workflow when a stale success lands after switching back', () => {
60+
const controller = new SocketJoinController()
61+
62+
controller.setConnected(true)
63+
controller.requestWorkflow('workflow-a')
64+
controller.handleJoinSuccess('workflow-a')
65+
66+
expect(controller.requestWorkflow('workflow-b')).toEqual([
67+
{ type: 'join', workflowId: 'workflow-b' },
68+
])
69+
expect(controller.requestWorkflow('workflow-a')).toEqual([])
70+
71+
expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({
72+
apply: false,
73+
ignored: true,
74+
workflowId: 'workflow-b',
75+
commands: [{ type: 'join', workflowId: 'workflow-a' }],
76+
})
77+
expect(controller.handleJoinSuccess('workflow-a')).toMatchObject({
78+
apply: true,
79+
ignored: false,
80+
workflowId: 'workflow-a',
81+
commands: [],
82+
})
83+
})
84+
85+
it('leaves the room when a late join succeeds after navigating away', () => {
86+
const controller = new SocketJoinController()
87+
88+
controller.setConnected(true)
89+
controller.requestWorkflow('workflow-a')
90+
controller.handleJoinSuccess('workflow-a')
91+
92+
expect(controller.requestWorkflow('workflow-b')).toEqual([
93+
{ type: 'join', workflowId: 'workflow-b' },
94+
])
95+
expect(controller.requestWorkflow(null)).toEqual([])
96+
97+
expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({
98+
apply: false,
99+
ignored: true,
100+
workflowId: 'workflow-b',
101+
commands: [{ type: 'leave' }],
102+
})
103+
})
104+
105+
it('preserves the last joined workflow during retryable switch failures', () => {
106+
const controller = new SocketJoinController()
107+
108+
controller.setConnected(true)
109+
expect(controller.requestWorkflow('workflow-a')).toEqual([
110+
{ type: 'join', workflowId: 'workflow-a' },
111+
])
112+
controller.handleJoinSuccess('workflow-a')
113+
114+
expect(controller.requestWorkflow('workflow-b')).toEqual([
115+
{ type: 'join', workflowId: 'workflow-b' },
116+
])
117+
118+
const errorResult = controller.handleJoinError({
119+
workflowId: 'workflow-b',
120+
retryable: true,
121+
})
122+
123+
expect(errorResult.apply).toBe(false)
124+
expect(errorResult.retryScheduled).toBe(true)
125+
expect(errorResult.commands).toEqual([
126+
{
127+
type: 'schedule-retry',
128+
workflowId: 'workflow-b',
129+
attempt: 1,
130+
delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS,
131+
},
132+
])
133+
expect(controller.getJoinedWorkflowId()).toBe('workflow-a')
134+
expect(controller.retryJoin('workflow-b')).toEqual([{ type: 'join', workflowId: 'workflow-b' }])
135+
})
136+
137+
it('uses capped exponential backoff for retryable join failures', () => {
138+
const controller = new SocketJoinController()
139+
140+
controller.setConnected(true)
141+
controller.requestWorkflow('workflow-a')
142+
143+
const first = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true })
144+
expect(first.commands).toEqual([
145+
{
146+
type: 'schedule-retry',
147+
workflowId: 'workflow-a',
148+
attempt: 1,
149+
delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS,
150+
},
151+
])
152+
153+
controller.retryJoin('workflow-a')
154+
const second = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true })
155+
expect(second.commands).toEqual([
156+
{
157+
type: 'schedule-retry',
158+
workflowId: 'workflow-a',
159+
attempt: 2,
160+
delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS * 2,
161+
},
162+
])
163+
164+
controller.retryJoin('workflow-a')
165+
controller.handleJoinError({ workflowId: 'workflow-a', retryable: true })
166+
controller.retryJoin('workflow-a')
167+
const fourth = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true })
168+
expect(fourth.commands).toEqual([
169+
{
170+
type: 'schedule-retry',
171+
workflowId: 'workflow-a',
172+
attempt: 4,
173+
delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS * 8,
174+
},
175+
])
176+
177+
controller.retryJoin('workflow-a')
178+
const fifth = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true })
179+
expect(fifth.commands).toEqual([
180+
{
181+
type: 'schedule-retry',
182+
workflowId: 'workflow-a',
183+
attempt: 5,
184+
delayMs: SOCKET_JOIN_RETRY_MAX_DELAY_MS,
185+
},
186+
])
187+
})
188+
189+
it('blocks a permanently failed workflow and leaves the fallback room cleanly', () => {
190+
const controller = new SocketJoinController()
191+
192+
controller.setConnected(true)
193+
controller.requestWorkflow('workflow-a')
194+
controller.handleJoinSuccess('workflow-a')
195+
196+
expect(controller.requestWorkflow('workflow-b')).toEqual([
197+
{ type: 'join', workflowId: 'workflow-b' },
198+
])
199+
200+
const errorResult = controller.handleJoinError({
201+
workflowId: 'workflow-b',
202+
retryable: false,
203+
})
204+
205+
expect(errorResult.apply).toBe(true)
206+
expect(errorResult.commands).toEqual([{ type: 'leave' }])
207+
expect(controller.getJoinedWorkflowId()).toBeNull()
208+
expect(controller.requestWorkflow('workflow-b')).toEqual([])
209+
expect(controller.requestWorkflow('workflow-c')).toEqual([
210+
{ type: 'join', workflowId: 'workflow-c' },
211+
])
212+
})
213+
214+
it('rejoins the desired workflow when the server session is lost', () => {
215+
const controller = new SocketJoinController()
216+
217+
controller.setConnected(true)
218+
controller.requestWorkflow('workflow-a')
219+
controller.handleJoinSuccess('workflow-a')
220+
221+
expect(controller.forceRejoinWorkflow('workflow-a')).toEqual([
222+
{ type: 'join', workflowId: 'workflow-a' },
223+
])
224+
expect(controller.getJoinedWorkflowId()).toBeNull()
225+
})
226+
227+
it('resolves retryable errors without workflowId against the pending join', () => {
228+
const controller = new SocketJoinController()
229+
230+
controller.setConnected(true)
231+
controller.requestWorkflow('workflow-a')
232+
233+
const errorResult = controller.handleJoinError({ retryable: true })
234+
235+
expect(errorResult.workflowId).toBe('workflow-a')
236+
expect(errorResult.retryScheduled).toBe(true)
237+
expect(errorResult.commands).toEqual([
238+
{
239+
type: 'schedule-retry',
240+
workflowId: 'workflow-a',
241+
attempt: 1,
242+
delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS,
243+
},
244+
])
245+
})
246+
})

0 commit comments

Comments
 (0)