From 5c9d711378c0985b0dbe9a801bbb301b0e5436a5 Mon Sep 17 00:00:00 2001 From: waleed Date: Sun, 14 Jun 2026 21:11:00 -0700 Subject: [PATCH] fix(realtime): re-validate socket role and evict revoked collaborators Socket.io authorized workflow access only at join and cached the workspace role in presence, so a removed or downgraded collaborator kept live read/ write access until they disconnected. - Re-validate the cached role against the permissions table on mutating events, bounded by a short TTL; refresh or evict on change - Add /api/permissions-updated so the app reconciles active rooms, evicting revoked users (cross-pod) and refreshing downgraded roles - Notify realtime on workspace member removal and permission changes --- apps/realtime/src/handlers/eviction.ts | 24 ++ apps/realtime/src/handlers/operations.ts | 40 ++- apps/realtime/src/handlers/subblocks.ts | 37 ++- apps/realtime/src/handlers/variables.ts | 37 ++- apps/realtime/src/handlers/workflow.ts | 1 + apps/realtime/src/index.test.ts | 5 + .../authorize-socket-operation.test.ts | 248 ++++++++++++++++++ apps/realtime/src/middleware/permissions.ts | 65 +++++ apps/realtime/src/rooms/access.test.ts | 162 ++++++++++++ apps/realtime/src/rooms/access.ts | 79 ++++++ apps/realtime/src/rooms/memory-manager.ts | 17 ++ apps/realtime/src/rooms/redis-manager.ts | 58 ++++ apps/realtime/src/rooms/types.ts | 19 ++ apps/realtime/src/routes/http.ts | 19 ++ .../src/routes/permissions-updated.test.ts | 115 ++++++++ .../api/workspaces/[id]/permissions/route.ts | 9 + .../app/api/workspaces/members/[id]/route.ts | 5 + .../lib/workspaces/permissions/realtime.ts | 40 +++ 18 files changed, 964 insertions(+), 16 deletions(-) create mode 100644 apps/realtime/src/handlers/eviction.ts create mode 100644 apps/realtime/src/middleware/authorize-socket-operation.test.ts create mode 100644 apps/realtime/src/rooms/access.test.ts create mode 100644 apps/realtime/src/rooms/access.ts create mode 100644 apps/realtime/src/routes/permissions-updated.test.ts create mode 100644 apps/sim/lib/workspaces/permissions/realtime.ts diff --git a/apps/realtime/src/handlers/eviction.ts b/apps/realtime/src/handlers/eviction.ts new file mode 100644 index 00000000000..cfb69f935a3 --- /dev/null +++ b/apps/realtime/src/handlers/eviction.ts @@ -0,0 +1,24 @@ +import { createLogger } from '@sim/logger' +import type { AuthenticatedSocket } from '@/middleware/auth' +import type { IRoomManager } from '@/rooms' + +const logger = createLogger('SocketEviction') + +/** + * Removes the calling socket from a workflow room after its access has been + * revoked mid-session, so it immediately stops receiving broadcasts and can no + * longer mutate workflow state. + */ +export async function evictRevokedSocket( + roomManager: IRoomManager, + socket: AuthenticatedSocket, + workflowId: string +): Promise { + try { + socket.leave(workflowId) + await roomManager.removeUserFromRoom(socket.id, workflowId) + await roomManager.broadcastPresenceUpdate(workflowId) + } catch (error) { + logger.error(`Failed to evict revoked socket ${socket.id} from workflow ${workflowId}`, error) + } +} diff --git a/apps/realtime/src/handlers/operations.ts b/apps/realtime/src/handlers/operations.ts index 3fa4ac0f8e7..cb0298e6509 100644 --- a/apps/realtime/src/handlers/operations.ts +++ b/apps/realtime/src/handlers/operations.ts @@ -14,8 +14,9 @@ import { generateId } from '@sim/utils/id' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { ZodError } from 'zod' import { persistWorkflowOperation } from '@/database/operations' +import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { authorizeSocketOperation } from '@/middleware/permissions' import type { IRoomManager, UserSession } from '@/rooms' const logger = createLogger('OperationsHandlers') @@ -125,15 +126,42 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) - // Check permissions using cached role (no DB query) - const permissionCheck = checkRolePermission(userPresence.role, operation) - if (!permissionCheck.allowed) { + // Re-validate the cached role against the live permissions table (bounded + // by a short TTL) so a revoked or downgraded collaborator cannot keep + // mutating the workflow on an already-connected socket. + const authorization = await authorizeSocketOperation({ + roomManager, + workflowId, + socketId: socket.id, + userId: session.userId, + presence: userPresence, + operation, + }) + + if (authorization.accessRevoked) { + logger.warn( + `User ${session.userId} lost access to workflow ${workflowId}; evicting socket ${socket.id}` + ) + emitOperationError( + { + type: 'ACCESS_REVOKED', + message: authorization.reason || 'Access to this workflow has been revoked', + operation, + target, + }, + { error: authorization.reason || 'Access revoked', retryable: false } + ) + await evictRevokedSocket(roomManager, socket, workflowId) + return + } + + if (!authorization.allowed) { logger.warn( - `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` + `User ${session.userId} (role: ${authorization.role}) forbidden from ${operation} on ${target}` ) emitOperationError({ type: 'INSUFFICIENT_PERMISSIONS', - message: `${permissionCheck.reason} on '${target}'`, + message: `${authorization.reason} on '${target}'`, operation, target, }) diff --git a/apps/realtime/src/handlers/subblocks.ts b/apps/realtime/src/handlers/subblocks.ts index 1ee35d3e722..8b7b44dbc85 100644 --- a/apps/realtime/src/handlers/subblocks.ts +++ b/apps/realtime/src/handlers/subblocks.ts @@ -6,8 +6,9 @@ import { getErrorMessage } from '@sim/utils/errors' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow' import { and, eq } from 'drizzle-orm' +import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { authorizeSocketOperation } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('SubblocksHandlers') @@ -136,18 +137,44 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: return } - const permissionCheck = checkRolePermission(userPresence.role, SUBBLOCK_OPERATIONS.UPDATE) - if (!permissionCheck.allowed) { + const authorization = await authorizeSocketOperation({ + roomManager, + workflowId, + socketId: socket.id, + userId: session.userId, + presence: userPresence, + operation: SUBBLOCK_OPERATIONS.UPDATE, + }) + + if (authorization.accessRevoked) { + socket.emit('operation-forbidden', { + type: 'ACCESS_REVOKED', + message: authorization.reason || 'Access to this workflow has been revoked', + operation: SUBBLOCK_OPERATIONS.UPDATE, + target: 'subblock', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: authorization.reason || 'Access revoked', + retryable: false, + }) + } + await evictRevokedSocket(roomManager, socket, workflowId) + return + } + + if (!authorization.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', - message: permissionCheck.reason || 'Insufficient permissions', + message: authorization.reason || 'Insufficient permissions', operation: SUBBLOCK_OPERATIONS.UPDATE, target: 'subblock', }) if (operationId) { socket.emit('operation-failed', { operationId, - error: permissionCheck.reason || 'Insufficient permissions', + error: authorization.reason || 'Insufficient permissions', retryable: false, }) } diff --git a/apps/realtime/src/handlers/variables.ts b/apps/realtime/src/handlers/variables.ts index 41aeed3f83f..630575e8e1c 100644 --- a/apps/realtime/src/handlers/variables.ts +++ b/apps/realtime/src/handlers/variables.ts @@ -5,8 +5,9 @@ import { VARIABLE_OPERATIONS } from '@sim/realtime-protocol/constants' import { getErrorMessage } from '@sim/utils/errors' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { eq } from 'drizzle-orm' +import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { authorizeSocketOperation } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('VariablesHandlers') @@ -124,18 +125,44 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: return } - const permissionCheck = checkRolePermission(userPresence.role, VARIABLE_OPERATIONS.UPDATE) - if (!permissionCheck.allowed) { + const authorization = await authorizeSocketOperation({ + roomManager, + workflowId, + socketId: socket.id, + userId: session.userId, + presence: userPresence, + operation: VARIABLE_OPERATIONS.UPDATE, + }) + + if (authorization.accessRevoked) { + socket.emit('operation-forbidden', { + type: 'ACCESS_REVOKED', + message: authorization.reason || 'Access to this workflow has been revoked', + operation: VARIABLE_OPERATIONS.UPDATE, + target: 'variable', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: authorization.reason || 'Access revoked', + retryable: false, + }) + } + await evictRevokedSocket(roomManager, socket, workflowId) + return + } + + if (!authorization.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', - message: permissionCheck.reason || 'Insufficient permissions', + message: authorization.reason || 'Insufficient permissions', operation: VARIABLE_OPERATIONS.UPDATE, target: 'variable', }) if (operationId) { socket.emit('operation-failed', { operationId, - error: permissionCheck.reason || 'Insufficient permissions', + error: authorization.reason || 'Insufficient permissions', retryable: false, }) } diff --git a/apps/realtime/src/handlers/workflow.ts b/apps/realtime/src/handlers/workflow.ts index da977fcdb5e..05ea8b8f41e 100644 --- a/apps/realtime/src/handlers/workflow.ts +++ b/apps/realtime/src/handlers/workflow.ts @@ -160,6 +160,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: joinedAt: Date.now(), lastActivity: Date.now(), role: userRole, + roleCheckedAt: Date.now(), avatarUrl, } diff --git a/apps/realtime/src/index.test.ts b/apps/realtime/src/index.test.ts index f2d6d8e6868..588ed7e0e47 100644 --- a/apps/realtime/src/index.test.ts +++ b/apps/realtime/src/index.test.ts @@ -73,6 +73,11 @@ vi.mock('@/middleware/permissions', () => ({ checkRolePermission: vi.fn().mockReturnValue({ allowed: true, }), + authorizeSocketOperation: vi.fn().mockResolvedValue({ + allowed: true, + role: 'admin', + accessRevoked: false, + }), })) vi.mock('@/database/operations', () => ({ diff --git a/apps/realtime/src/middleware/authorize-socket-operation.test.ts b/apps/realtime/src/middleware/authorize-socket-operation.test.ts new file mode 100644 index 00000000000..9b16783e0c6 --- /dev/null +++ b/apps/realtime/src/middleware/authorize-socket-operation.test.ts @@ -0,0 +1,248 @@ +/** + * @vitest-environment node + * + * Tests for `authorizeSocketOperation` — the per-event role re-validation that + * bounds how long a revoked or downgraded collaborator can keep mutating a + * workflow on an already-connected socket. + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockLimit, mockAuthorize } = vi.hoisted(() => ({ + mockLimit: vi.fn(), + mockAuthorize: vi.fn(), +})) + +vi.mock('@sim/db', () => ({ + db: { + select: () => ({ + from: () => ({ + where: () => ({ + limit: mockLimit, + }), + }), + }), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + workflow: {}, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn(() => ({})), + eq: vi.fn(() => ({})), + isNull: vi.fn(() => ({})), +})) + +vi.mock('@sim/workflow-authz', () => ({ + authorizeWorkflowByWorkspacePermission: mockAuthorize, +})) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +import { authorizeSocketOperation } from '@/middleware/permissions' +import type { UserPresence } from '@/rooms/types' + +const WRITE_OP = 'add' // a write/admin operation (not in READ_OPERATIONS) +const READ_OP = 'update-position' // allowed for read role too + +function createManager() { + return { + updateUserRole: vi.fn().mockResolvedValue(undefined), + } +} + +function createPresence(overrides?: Partial): UserPresence { + return { + userId: 'user-1', + workflowId: 'workflow-1', + userName: 'Test User', + socketId: 'socket-1', + joinedAt: 1, + lastActivity: 1, + role: 'write', + roleCheckedAt: Date.now(), + ...overrides, + } +} + +/** Configures the live-permission lookup to grant `role`, or deny when null. */ +function grant(role: string | null) { + mockLimit.mockResolvedValue([{ workspaceId: 'ws-1', name: 'WF' }]) + if (role === null) { + mockAuthorize.mockResolvedValue({ allowed: false, message: 'denied' }) + } else { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: role }) + } +} + +describe('authorizeSocketOperation', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('uses the cached role without a DB read while the role is fresh', async () => { + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result).toEqual({ + allowed: true, + role: 'write', + reason: undefined, + accessRevoked: false, + }) + expect(mockAuthorize).not.toHaveBeenCalled() + expect(mockLimit).not.toHaveBeenCalled() + expect(manager.updateUserRole).not.toHaveBeenCalled() + }) + + it('re-validates against the DB once the cached role is stale', async () => { + grant('write') + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.allowed).toBe(true) + expect(result.accessRevoked).toBe(false) + expect(mockAuthorize).toHaveBeenCalledTimes(1) + expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'write') + }) + + it('treats missing roleCheckedAt (pre-upgrade presence) as stale and re-validates', async () => { + grant('write') + const manager = createManager() + const presence = createPresence({ role: 'write' }) + presence.roleCheckedAt = undefined + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.allowed).toBe(true) + expect(mockAuthorize).toHaveBeenCalledTimes(1) + }) + + it('denies a write after a downgrade to read and refreshes the cached role', async () => { + grant('read') + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.allowed).toBe(false) + expect(result.accessRevoked).toBe(false) + expect(result.role).toBe('read') + expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'read') + }) + + it('still allows a read-tier op after a downgrade to read', async () => { + grant('read') + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: READ_OP, + }) + + expect(result.allowed).toBe(true) + expect(result.role).toBe('read') + }) + + it('reports accessRevoked when workspace permission has been removed', async () => { + grant(null) + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.accessRevoked).toBe(true) + expect(result.allowed).toBe(false) + expect(manager.updateUserRole).not.toHaveBeenCalled() + }) + + it('reports accessRevoked when the workflow no longer exists', async () => { + mockLimit.mockResolvedValue([]) + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.accessRevoked).toBe(true) + expect(mockAuthorize).not.toHaveBeenCalled() + }) + + it('falls back to the cached role on a transient DB error (no lockout)', async () => { + mockLimit.mockRejectedValue(new Error('connection reset')) + const manager = createManager() + const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) + + const result = await authorizeSocketOperation({ + roomManager: manager as never, + workflowId: 'workflow-1', + socketId: 'socket-1', + userId: 'user-1', + presence, + operation: WRITE_OP, + }) + + expect(result.allowed).toBe(true) + expect(result.accessRevoked).toBe(false) + expect(result.role).toBe('write') + expect(manager.updateUserRole).not.toHaveBeenCalled() + }) +}) diff --git a/apps/realtime/src/middleware/permissions.ts b/apps/realtime/src/middleware/permissions.ts index 26a4d8c8542..25435b69a5c 100644 --- a/apps/realtime/src/middleware/permissions.ts +++ b/apps/realtime/src/middleware/permissions.ts @@ -13,9 +13,17 @@ import { } from '@sim/realtime-protocol/constants' import { authorizeWorkflowByWorkspacePermission } from '@sim/workflow-authz' import { and, eq, isNull } from 'drizzle-orm' +import type { IRoomManager, UserPresence } from '@/rooms/types' const logger = createLogger('SocketPermissions') +/** + * How long a cached role is trusted before it must be re-verified against the + * live `permissions` table. Bounds the window in which a revoked or downgraded + * collaborator can keep acting on a stale role on an already-connected socket. + */ +const ROLE_REVALIDATION_TTL_MS = 15_000 + // Admin-only operations (require admin role) const ADMIN_ONLY_OPERATIONS: string[] = [BLOCKS_OPERATIONS.BATCH_TOGGLE_LOCKED] @@ -83,6 +91,63 @@ export function checkRolePermission( return { allowed: true } } +/** + * Authorizes a mutating socket operation against the caller's *current* role. + * + * The role cached in presence is trusted only for `ROLE_REVALIDATION_TTL_MS`; + * once stale it is re-verified against the live `permissions` table and the + * refreshed role is written back to presence. This bounds how long a revoked or + * downgraded collaborator can keep mutating a workflow on an already-connected + * socket, complementing the push-based eviction triggered by the main app. + * + * Transient database failures during re-validation fall back to the last known + * role (without refreshing the timestamp, so the next operation retries) rather + * than locking out legitimate users during a blip. + */ +export async function authorizeSocketOperation(params: { + roomManager: IRoomManager + workflowId: string + socketId: string + userId: string + presence: UserPresence + operation: string +}): Promise<{ allowed: boolean; role: string; reason?: string; accessRevoked: boolean }> { + const { roomManager, workflowId, socketId, userId, presence, operation } = params + + let role = presence.role + const lastChecked = presence.roleCheckedAt ?? 0 + const isStale = Date.now() - lastChecked >= ROLE_REVALIDATION_TTL_MS + + if (isStale) { + try { + const access = await verifyWorkflowAccess(userId, workflowId) + if (!access.hasAccess) { + return { + allowed: false, + role, + reason: 'Access to this workflow has been revoked', + accessRevoked: true, + } + } + role = access.role || 'read' + await roomManager.updateUserRole(workflowId, socketId, role) + } catch (error) { + logger.warn( + `Failed to re-validate role for user ${userId} on workflow ${workflowId}; reusing cached role`, + error + ) + } + } + + const permissionCheck = checkRolePermission(role, operation) + return { + allowed: permissionCheck.allowed, + role, + reason: permissionCheck.reason, + accessRevoked: false, + } +} + /** * Verifies a user's access to a workflow via workspace permissions. * diff --git a/apps/realtime/src/rooms/access.test.ts b/apps/realtime/src/rooms/access.test.ts new file mode 100644 index 00000000000..9b79cf9e8e8 --- /dev/null +++ b/apps/realtime/src/rooms/access.test.ts @@ -0,0 +1,162 @@ +/** + * @vitest-environment node + * + * Tests for `reconcileWorkspaceAccessChange` — the push-driven reconciliation + * that evicts users whose workspace access was revoked and refreshes the cached + * role of users who were merely downgraded. + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockWhere, mockVerifyWorkflowAccess } = vi.hoisted(() => ({ + mockWhere: vi.fn(), + mockVerifyWorkflowAccess: vi.fn(), +})) + +vi.mock('@sim/db', () => ({ + db: { + select: () => ({ + from: () => ({ + where: mockWhere, + }), + }), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + workflow: {}, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn(() => ({})), + eq: vi.fn(() => ({})), + isNull: vi.fn(() => ({})), +})) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +vi.mock('@/middleware/permissions', () => ({ + verifyWorkflowAccess: mockVerifyWorkflowAccess, +})) + +import { reconcileWorkspaceAccessChange } from '@/rooms/access' +import type { UserPresence } from '@/rooms/types' + +function presence(socketId: string, userId: string, role = 'write'): UserPresence { + return { + userId, + workflowId: 'workflow-1', + userName: userId, + socketId, + joinedAt: 1, + lastActivity: 1, + role, + } +} + +function createManager(users: UserPresence[]) { + const socketsLeave = vi.fn().mockResolvedValue(undefined) + const emit = vi.fn() + return { + socketsLeave, + emit, + hasWorkflowRoom: vi.fn().mockResolvedValue(true), + getWorkflowUsers: vi.fn().mockResolvedValue(users), + updateUserRole: vi.fn().mockResolvedValue(undefined), + removeUserFromRoom: vi.fn().mockResolvedValue('workflow-1'), + broadcastPresenceUpdate: vi.fn().mockResolvedValue(undefined), + io: { + to: vi.fn(() => ({ emit })), + in: vi.fn(() => ({ socketsLeave })), + }, + } +} + +describe('reconcileWorkspaceAccessChange', () => { + beforeEach(() => { + vi.clearAllMocks() + mockWhere.mockResolvedValue([{ id: 'workflow-1' }]) + }) + + it('evicts every socket of a user whose access was revoked', async () => { + mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: false }) + const manager = createManager([presence('socket-1', 'user-1'), presence('socket-2', 'user-1')]) + + await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + + expect(manager.io.to).toHaveBeenCalledWith('socket-1') + expect(manager.io.to).toHaveBeenCalledWith('socket-2') + expect(manager.emit).toHaveBeenCalledWith( + 'workflow-permissions-revoked', + expect.objectContaining({ workflowId: 'workflow-1' }) + ) + expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-1', 'workflow-1') + expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-2', 'workflow-1') + expect(manager.socketsLeave).toHaveBeenCalledTimes(2) + expect(manager.updateUserRole).not.toHaveBeenCalled() + expect(manager.broadcastPresenceUpdate).toHaveBeenCalledWith('workflow-1') + }) + + it('refreshes the cached role without eviction on a downgrade', async () => { + mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true, role: 'read' }) + const manager = createManager([presence('socket-1', 'user-1', 'write')]) + + await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + + expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'read') + expect(manager.removeUserFromRoom).not.toHaveBeenCalled() + expect(manager.socketsLeave).not.toHaveBeenCalled() + expect(manager.emit).not.toHaveBeenCalled() + expect(manager.broadcastPresenceUpdate).toHaveBeenCalledWith('workflow-1') + }) + + it('does nothing for rooms where the user is not present', async () => { + const manager = createManager([presence('socket-9', 'someone-else')]) + + await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + + expect(mockVerifyWorkflowAccess).not.toHaveBeenCalled() + expect(manager.updateUserRole).not.toHaveBeenCalled() + expect(manager.removeUserFromRoom).not.toHaveBeenCalled() + }) + + it('skips workflows that have no active room', async () => { + const manager = createManager([presence('socket-1', 'user-1')]) + manager.hasWorkflowRoom.mockResolvedValue(false) + + await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + + expect(manager.getWorkflowUsers).not.toHaveBeenCalled() + expect(mockVerifyWorkflowAccess).not.toHaveBeenCalled() + }) + + it('continues to other workflows when one room fails', async () => { + mockWhere.mockResolvedValue([{ id: 'workflow-1' }, { id: 'workflow-2' }]) + mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: false }) + const manager = createManager([presence('socket-1', 'user-1')]) + manager.getWorkflowUsers + .mockRejectedValueOnce(new Error('redis blip')) + .mockResolvedValueOnce([presence('socket-1', 'user-1')]) + + await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + + // First workflow threw; second still evicted. + expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-1', 'workflow-2') + }) + + it('returns without throwing when the workspace workflow lookup fails', async () => { + mockWhere.mockRejectedValue(new Error('db down')) + const manager = createManager([presence('socket-1', 'user-1')]) + + await expect( + reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') + ).resolves.toBeUndefined() + expect(manager.hasWorkflowRoom).not.toHaveBeenCalled() + }) +}) diff --git a/apps/realtime/src/rooms/access.ts b/apps/realtime/src/rooms/access.ts new file mode 100644 index 00000000000..b7ae62dd83d --- /dev/null +++ b/apps/realtime/src/rooms/access.ts @@ -0,0 +1,79 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, isNull } from 'drizzle-orm' +import { verifyWorkflowAccess } from '@/middleware/permissions' +import type { IRoomManager } from '@/rooms/types' + +const logger = createLogger('RoomAccess') + +/** + * Reconciles active realtime rooms with a user's current workspace permission. + * + * For every non-archived workflow in the workspace that has an active room + * containing the user, the user's access is re-verified against the live + * `permissions` table: + * - If access was fully revoked, every one of the user's sockets is evicted from + * the room (works cross-pod via the Redis adapter) so they immediately stop + * receiving and persisting edits. + * - If access was merely downgraded, the cached role is refreshed in place so + * subsequent operations are authorized against the new role without waiting for + * the per-presence revalidation TTL to elapse. + */ +export async function reconcileWorkspaceAccessChange( + manager: IRoomManager, + workspaceId: string, + userId: string +): Promise { + let workflows: { id: string }[] + try { + workflows = await db + .select({ id: workflow.id }) + .from(workflow) + .where(and(eq(workflow.workspaceId, workspaceId), isNull(workflow.archivedAt))) + } catch (error) { + logger.error(`Failed to load workflows for workspace ${workspaceId} access change`, error) + return + } + + for (const { id: workflowId } of workflows) { + try { + const hasRoom = await manager.hasWorkflowRoom(workflowId) + if (!hasRoom) continue + + const users = await manager.getWorkflowUsers(workflowId) + const targets = users.filter((u) => u.userId === userId) + if (targets.length === 0) continue + + const access = await verifyWorkflowAccess(userId, workflowId) + + if (access.hasAccess) { + const role = access.role || 'read' + for (const target of targets) { + await manager.updateUserRole(workflowId, target.socketId, role) + } + await manager.broadcastPresenceUpdate(workflowId) + logger.info( + `Refreshed cached role to '${role}' for user ${userId} on workflow ${workflowId}` + ) + continue + } + + for (const target of targets) { + manager.io.to(target.socketId).emit('workflow-permissions-revoked', { + workflowId, + message: 'Your access to this workflow has been revoked', + timestamp: Date.now(), + }) + await manager.removeUserFromRoom(target.socketId, workflowId) + await manager.io.in(target.socketId).socketsLeave(workflowId) + } + await manager.broadcastPresenceUpdate(workflowId) + logger.info( + `Evicted ${targets.length} socket(s) for revoked user ${userId} from workflow ${workflowId}` + ) + } catch (error) { + logger.error(`Failed to reconcile access for user ${userId} on workflow ${workflowId}`, error) + } + } +} diff --git a/apps/realtime/src/rooms/memory-manager.ts b/apps/realtime/src/rooms/memory-manager.ts index a032e785bb5..e837657fd1c 100644 --- a/apps/realtime/src/rooms/memory-manager.ts +++ b/apps/realtime/src/rooms/memory-manager.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import type { Server } from 'socket.io' +import { reconcileWorkspaceAccessChange } from '@/rooms/access' import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/rooms/types' const logger = createLogger('MemoryRoomManager') @@ -126,6 +127,17 @@ export class MemoryRoomManager implements IRoomManager { } } + async updateUserRole(workflowId: string, socketId: string, role: string): Promise { + const room = this.workflowRooms.get(workflowId) + if (!room) return + + const presence = room.users.get(socketId) + if (presence) { + presence.role = role + presence.roleCheckedAt = Date.now() + } + } + async updateRoomLastModified(workflowId: string): Promise { const room = this.workflowRooms.get(workflowId) if (room) { @@ -255,4 +267,9 @@ export class MemoryRoomManager implements IRoomManager { logger.info(`Notified ${room.users.size} users about workflow deployment change: ${workflowId}`) } + + async handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise { + logger.info(`Handling workspace access change for user ${userId} in workspace ${workspaceId}`) + await reconcileWorkspaceAccessChange(this, workspaceId, userId) + } } diff --git a/apps/realtime/src/rooms/redis-manager.ts b/apps/realtime/src/rooms/redis-manager.ts index 0e6b3eadf2b..653c0b467de 100644 --- a/apps/realtime/src/rooms/redis-manager.ts +++ b/apps/realtime/src/rooms/redis-manager.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { createClient, type RedisClientType } from 'redis' import type { Server } from 'socket.io' +import { reconcileWorkspaceAccessChange } from '@/rooms/access' import type { IRoomManager, UserPresence, UserSession } from '@/rooms/types' const logger = createLogger('RedisRoomManager') @@ -96,6 +97,30 @@ redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl) return 1 ` +/** + * Lua script for atomically updating a user's cached role and verification + * timestamp. Read-modify-write under a single script so a concurrent activity + * update on the same presence field cannot clobber the refreshed role. + */ +const UPDATE_ROLE_SCRIPT = ` +local workflowUsersKey = KEYS[1] +local socketId = ARGV[1] +local role = ARGV[2] +local roleCheckedAt = ARGV[3] + +local existingJson = redis.call('HGET', workflowUsersKey, socketId) +if not existingJson then + return 0 +end + +local existing = cjson.decode(existingJson) +existing.role = role +existing.roleCheckedAt = tonumber(roleCheckedAt) + +redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) +return 1 +` + /** * Redis-backed room manager for multi-pod deployments. * Uses Lua scripts for atomic operations to prevent race conditions. @@ -106,6 +131,7 @@ export class RedisRoomManager implements IRoomManager { private isConnected = false private removeUserScriptSha: string | null = null private updateActivityScriptSha: string | null = null + private updateRoleScriptSha: string | null = null constructor(io: Server, redisUrl: string) { this._io = io @@ -151,6 +177,7 @@ export class RedisRoomManager implements IRoomManager { // Pre-load Lua scripts for better performance this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT) + this.updateRoleScriptSha = await this.redis.scriptLoad(UPDATE_ROLE_SCRIPT) logger.info('RedisRoomManager connected to Redis and scripts loaded') } catch (error) { @@ -331,6 +358,32 @@ export class RedisRoomManager implements IRoomManager { } } + async updateUserRole( + workflowId: string, + socketId: string, + role: string, + retried = false + ): Promise { + if (!this.updateRoleScriptSha) { + logger.error('updateUserRole called before initialize()') + return + } + + try { + await this.redis.evalSha(this.updateRoleScriptSha, { + keys: [KEYS.workflowUsers(workflowId)], + arguments: [socketId, role, Date.now().toString()], + }) + } catch (error) { + if ((error as Error).message?.includes('NOSCRIPT') && !retried) { + logger.warn('Lua script not found, reloading...') + this.updateRoleScriptSha = await this.redis.scriptLoad(UPDATE_ROLE_SCRIPT) + return this.updateUserRole(workflowId, socketId, role, true) + } + logger.error(`Failed to update user role: ${socketId}`, error) + } + } + async updateRoomLastModified(workflowId: string): Promise { await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) } @@ -457,4 +510,9 @@ export class RedisRoomManager implements IRoomManager { const userCount = await this.getUniqueUserCount(workflowId) logger.info(`Notified ${userCount} users about workflow deployment change: ${workflowId}`) } + + async handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise { + logger.info(`Handling workspace access change for user ${userId} in workspace ${workspaceId}`) + await reconcileWorkspaceAccessChange(this, workspaceId, userId) + } } diff --git a/apps/realtime/src/rooms/types.ts b/apps/realtime/src/rooms/types.ts index 9553a427e1e..b9da9cf7e49 100644 --- a/apps/realtime/src/rooms/types.ts +++ b/apps/realtime/src/rooms/types.ts @@ -12,6 +12,12 @@ export interface UserPresence { joinedAt: number lastActivity: number role: string + /** + * Timestamp (ms) of the last time `role` was verified against the live + * `permissions` table. Used to bound how long a revoked or downgraded + * collaborator can keep acting on a stale, cached role. + */ + roleCheckedAt?: number cursor?: { x: number; y: number } selection?: { type: 'block' | 'edge' | 'none'; id?: string } avatarUrl?: string | null @@ -99,6 +105,12 @@ export interface IRoomManager { updates: Partial> ): Promise + /** + * Update a user's cached workspace role and refresh its verification + * timestamp. Called after re-validating access against the database. + */ + updateUserRole(workflowId: string, socketId: string, role: string): Promise + /** * Update room's lastModified timestamp */ @@ -143,4 +155,11 @@ export interface IRoomManager { * Handle workflow deployment change - notify users to refresh deployment state */ handleWorkflowDeployed(workflowId: string): Promise + + /** + * Handle a workspace permission change for a user. Evicts the user from any + * active workflow rooms in the workspace where their access has been revoked, + * and refreshes their cached role where access was merely downgraded. + */ + handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise } diff --git a/apps/realtime/src/routes/http.ts b/apps/realtime/src/routes/http.ts index 0f8ed73cc52..3ecfcbd6650 100644 --- a/apps/realtime/src/routes/http.ts +++ b/apps/realtime/src/routes/http.ts @@ -150,6 +150,25 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { return } + // Handle workspace permission change notifications from the main API. + // Evicts users whose access was revoked and refreshes downgraded roles. + if (req.method === 'POST' && req.url === '/api/permissions-updated') { + try { + const body = await readRequestBody(req) + const { workspaceId, userId } = JSON.parse(body) + if (typeof workspaceId !== 'string' || typeof userId !== 'string') { + sendError(res, 'workspaceId and userId are required', 400) + return + } + await roomManager.handleWorkspaceAccessChange(workspaceId, userId) + sendSuccess(res) + } catch (error) { + logger.error('Error handling permissions update notification:', error) + sendError(res, 'Failed to process permissions update notification') + } + return + } + res.writeHead(404, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: 'Not found' })) } diff --git a/apps/realtime/src/routes/permissions-updated.test.ts b/apps/realtime/src/routes/permissions-updated.test.ts new file mode 100644 index 00000000000..5ecd8751b30 --- /dev/null +++ b/apps/realtime/src/routes/permissions-updated.test.ts @@ -0,0 +1,115 @@ +/** + * @vitest-environment node + * + * Tests for the `/api/permissions-updated` HTTP endpoint that the main app calls + * to reconcile realtime rooms after a workspace permission change. + */ +import type { IncomingMessage, ServerResponse } from 'node:http' +import { Readable } from 'node:stream' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { createHttpHandler } from '@/routes/http' + +const API_KEY = 'test-internal-api-secret-at-least-32-chars' + +function createLogger() { + return { info: vi.fn(), error: vi.fn(), debug: vi.fn(), warn: vi.fn() } +} + +function createRoomManager(overrides?: Record) { + return { + isReady: vi.fn().mockReturnValue(true), + handleWorkspaceAccessChange: vi.fn().mockResolvedValue(undefined), + ...overrides, + } +} + +function createRequest(body: unknown, headers: Record = {}): IncomingMessage { + const req = Readable.from([ + typeof body === 'string' ? body : JSON.stringify(body), + ]) as unknown as IncomingMessage + req.method = 'POST' + req.url = '/api/permissions-updated' + req.headers = headers + return req +} + +function createResponse() { + const res = { + statusCode: 0, + body: '', + writeHead: vi.fn((status: number) => { + res.statusCode = status + return res + }), + end: vi.fn((chunk?: string) => { + if (chunk) res.body = chunk + }), + } + return res as unknown as ServerResponse & { statusCode: number; body: string } +} + +async function invoke(roomManager: ReturnType, req: IncomingMessage) { + const res = createResponse() + const handler = createHttpHandler(roomManager as never, createLogger()) + await handler(req, res) + return res +} + +describe('POST /api/permissions-updated', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('rejects requests without a valid internal API key', async () => { + const roomManager = createRoomManager() + const res = await invoke(roomManager, createRequest({ workspaceId: 'ws-1', userId: 'user-1' })) + + expect(res.statusCode).toBe(401) + expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() + }) + + it('returns 400 when workspaceId or userId is missing', async () => { + const roomManager = createRoomManager() + const res = await invoke( + roomManager, + createRequest({ workspaceId: 'ws-1' }, { 'x-api-key': API_KEY }) + ) + + expect(res.statusCode).toBe(400) + expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() + }) + + it('delegates to handleWorkspaceAccessChange on a valid request', async () => { + const roomManager = createRoomManager() + const res = await invoke( + roomManager, + createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) + ) + + expect(res.statusCode).toBe(200) + expect(roomManager.handleWorkspaceAccessChange).toHaveBeenCalledWith('ws-1', 'user-1') + }) + + it('returns 503 when the room manager is not ready', async () => { + const roomManager = createRoomManager({ isReady: vi.fn().mockReturnValue(false) }) + const res = await invoke( + roomManager, + createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) + ) + + expect(res.statusCode).toBe(503) + expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() + }) + + it('returns 500 when reconciliation throws', async () => { + const roomManager = createRoomManager({ + handleWorkspaceAccessChange: vi.fn().mockRejectedValue(new Error('boom')), + }) + const res = await invoke( + roomManager, + createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) + ) + + expect(res.statusCode).toBe(500) + }) +}) diff --git a/apps/sim/app/api/workspaces/[id]/permissions/route.ts b/apps/sim/app/api/workspaces/[id]/permissions/route.ts index 8e9bbd5bf32..bf4d13856d7 100644 --- a/apps/sim/app/api/workspaces/[id]/permissions/route.ts +++ b/apps/sim/app/api/workspaces/[id]/permissions/route.ts @@ -11,6 +11,7 @@ import { getSession } from '@/lib/auth' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment' import { captureServerEvent } from '@/lib/posthog/server' +import { notifyWorkspaceAccessChanged } from '@/lib/workspaces/permissions/realtime' import { checkWorkspaceAccess, getUserEntityPermissions, @@ -196,6 +197,14 @@ export const PATCH = withRouteHandler( }) } + // Reconcile active realtime rooms so downgraded collaborators lose write + // access promptly instead of riding their cached role until disconnect. + await Promise.all( + body.updates + .filter((update) => permLookup.get(update.userId)?.permission !== update.permissions) + .map((update) => notifyWorkspaceAccessChanged(workspaceId, update.userId)) + ) + const updatedUsers = await getUsersWithPermissions(workspaceId) for (const update of body.updates) { diff --git a/apps/sim/app/api/workspaces/members/[id]/route.ts b/apps/sim/app/api/workspaces/members/[id]/route.ts index 8679aea74c5..ce18f3f306d 100644 --- a/apps/sim/app/api/workspaces/members/[id]/route.ts +++ b/apps/sim/app/api/workspaces/members/[id]/route.ts @@ -12,6 +12,7 @@ import { reconcileOrganizationSeats } from '@/lib/billing/organizations/seats' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { revokeWorkspaceCredentialMembershipsTx } from '@/lib/credentials/access' import { captureServerEvent } from '@/lib/posthog/server' +import { notifyWorkspaceAccessChanged } from '@/lib/workspaces/permissions/realtime' import { hasWorkspaceAdminAccess } from '@/lib/workspaces/permissions/utils' import { reassignWorkflowOwnershipForWorkspaceMemberRemovalTx, @@ -155,6 +156,10 @@ export const DELETE = withRouteHandler( } ) + // Evict the removed user from any active realtime workflow rooms so their + // live read/write access ends immediately, not just on the next REST call. + await notifyWorkspaceAccessChanged(workspaceId, userId) + /** * Seats are tied to organization membership (one per member), so a * single-workspace removal only drops a seat when it leaves the member diff --git a/apps/sim/lib/workspaces/permissions/realtime.ts b/apps/sim/lib/workspaces/permissions/realtime.ts new file mode 100644 index 00000000000..60c2d7fc8fc --- /dev/null +++ b/apps/sim/lib/workspaces/permissions/realtime.ts @@ -0,0 +1,40 @@ +import { createLogger } from '@sim/logger' +import { env } from '@/lib/core/config/env' +import { getSocketServerUrl } from '@/lib/core/utils/urls' + +const logger = createLogger('WorkspacePermissionsRealtime') + +/** + * Notifies the realtime server that a user's workspace permission changed so it + * can reconcile any active workflow rooms — evicting the user where access was + * revoked and refreshing their cached role where it was downgraded. + * + * Best-effort: failures are logged but never block the permission mutation, and + * the realtime server independently re-validates cached roles on a short TTL. + */ +export async function notifyWorkspaceAccessChanged( + workspaceId: string, + userId: string +): Promise { + try { + const response = await fetch(`${getSocketServerUrl()}/api/permissions-updated`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, + body: JSON.stringify({ workspaceId, userId }), + }) + + if (!response.ok) { + logger.warn( + `Failed to notify realtime of access change for user ${userId} in workspace ${workspaceId} (${response.status})` + ) + } + } catch (error) { + logger.warn( + `Error notifying realtime of access change for user ${userId} in workspace ${workspaceId}`, + { error } + ) + } +}