diff --git a/apps/realtime/src/handlers/eviction.ts b/apps/realtime/src/handlers/eviction.ts deleted file mode 100644 index cfb69f935a3..00000000000 --- a/apps/realtime/src/handlers/eviction.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { createLogger } from '@sim/logger' -import type { AuthenticatedSocket } from '@/middleware/auth' -import type { IRoomManager } from '@/rooms' - -const logger = createLogger('SocketEviction') - -/** - * Removes the calling socket from a workflow room after its access has been - * revoked mid-session, so it immediately stops receiving broadcasts and can no - * longer mutate workflow state. - */ -export async function evictRevokedSocket( - roomManager: IRoomManager, - socket: AuthenticatedSocket, - workflowId: string -): Promise { - try { - socket.leave(workflowId) - await roomManager.removeUserFromRoom(socket.id, workflowId) - await roomManager.broadcastPresenceUpdate(workflowId) - } catch (error) { - logger.error(`Failed to evict revoked socket ${socket.id} from workflow ${workflowId}`, error) - } -} diff --git a/apps/realtime/src/handlers/operations.ts b/apps/realtime/src/handlers/operations.ts index cb0298e6509..3fa4ac0f8e7 100644 --- a/apps/realtime/src/handlers/operations.ts +++ b/apps/realtime/src/handlers/operations.ts @@ -14,9 +14,8 @@ import { generateId } from '@sim/utils/id' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { ZodError } from 'zod' import { persistWorkflowOperation } from '@/database/operations' -import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { authorizeSocketOperation } from '@/middleware/permissions' +import { checkRolePermission } from '@/middleware/permissions' import type { IRoomManager, UserSession } from '@/rooms' const logger = createLogger('OperationsHandlers') @@ -126,42 +125,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) - // Re-validate the cached role against the live permissions table (bounded - // by a short TTL) so a revoked or downgraded collaborator cannot keep - // mutating the workflow on an already-connected socket. - const authorization = await authorizeSocketOperation({ - roomManager, - workflowId, - socketId: socket.id, - userId: session.userId, - presence: userPresence, - operation, - }) - - if (authorization.accessRevoked) { - logger.warn( - `User ${session.userId} lost access to workflow ${workflowId}; evicting socket ${socket.id}` - ) - emitOperationError( - { - type: 'ACCESS_REVOKED', - message: authorization.reason || 'Access to this workflow has been revoked', - operation, - target, - }, - { error: authorization.reason || 'Access revoked', retryable: false } - ) - await evictRevokedSocket(roomManager, socket, workflowId) - return - } - - if (!authorization.allowed) { + // Check permissions using cached role (no DB query) + const permissionCheck = checkRolePermission(userPresence.role, operation) + if (!permissionCheck.allowed) { logger.warn( - `User ${session.userId} (role: ${authorization.role}) forbidden from ${operation} on ${target}` + `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` ) emitOperationError({ type: 'INSUFFICIENT_PERMISSIONS', - message: `${authorization.reason} on '${target}'`, + message: `${permissionCheck.reason} on '${target}'`, operation, target, }) diff --git a/apps/realtime/src/handlers/subblocks.ts b/apps/realtime/src/handlers/subblocks.ts index 8b7b44dbc85..1ee35d3e722 100644 --- a/apps/realtime/src/handlers/subblocks.ts +++ b/apps/realtime/src/handlers/subblocks.ts @@ -6,9 +6,8 @@ import { getErrorMessage } from '@sim/utils/errors' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow' import { and, eq } from 'drizzle-orm' -import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { authorizeSocketOperation } from '@/middleware/permissions' +import { checkRolePermission } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('SubblocksHandlers') @@ -137,44 +136,18 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: return } - const authorization = await authorizeSocketOperation({ - roomManager, - workflowId, - socketId: socket.id, - userId: session.userId, - presence: userPresence, - operation: SUBBLOCK_OPERATIONS.UPDATE, - }) - - if (authorization.accessRevoked) { - socket.emit('operation-forbidden', { - type: 'ACCESS_REVOKED', - message: authorization.reason || 'Access to this workflow has been revoked', - operation: SUBBLOCK_OPERATIONS.UPDATE, - target: 'subblock', - }) - if (operationId) { - socket.emit('operation-failed', { - operationId, - error: authorization.reason || 'Access revoked', - retryable: false, - }) - } - await evictRevokedSocket(roomManager, socket, workflowId) - return - } - - if (!authorization.allowed) { + const permissionCheck = checkRolePermission(userPresence.role, SUBBLOCK_OPERATIONS.UPDATE) + if (!permissionCheck.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', - message: authorization.reason || 'Insufficient permissions', + message: permissionCheck.reason || 'Insufficient permissions', operation: SUBBLOCK_OPERATIONS.UPDATE, target: 'subblock', }) if (operationId) { socket.emit('operation-failed', { operationId, - error: authorization.reason || 'Insufficient permissions', + error: permissionCheck.reason || 'Insufficient permissions', retryable: false, }) } diff --git a/apps/realtime/src/handlers/variables.ts b/apps/realtime/src/handlers/variables.ts index 630575e8e1c..41aeed3f83f 100644 --- a/apps/realtime/src/handlers/variables.ts +++ b/apps/realtime/src/handlers/variables.ts @@ -5,9 +5,8 @@ import { VARIABLE_OPERATIONS } from '@sim/realtime-protocol/constants' import { getErrorMessage } from '@sim/utils/errors' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { eq } from 'drizzle-orm' -import { evictRevokedSocket } from '@/handlers/eviction' import type { AuthenticatedSocket } from '@/middleware/auth' -import { authorizeSocketOperation } from '@/middleware/permissions' +import { checkRolePermission } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('VariablesHandlers') @@ -125,44 +124,18 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: return } - const authorization = await authorizeSocketOperation({ - roomManager, - workflowId, - socketId: socket.id, - userId: session.userId, - presence: userPresence, - operation: VARIABLE_OPERATIONS.UPDATE, - }) - - if (authorization.accessRevoked) { - socket.emit('operation-forbidden', { - type: 'ACCESS_REVOKED', - message: authorization.reason || 'Access to this workflow has been revoked', - operation: VARIABLE_OPERATIONS.UPDATE, - target: 'variable', - }) - if (operationId) { - socket.emit('operation-failed', { - operationId, - error: authorization.reason || 'Access revoked', - retryable: false, - }) - } - await evictRevokedSocket(roomManager, socket, workflowId) - return - } - - if (!authorization.allowed) { + const permissionCheck = checkRolePermission(userPresence.role, VARIABLE_OPERATIONS.UPDATE) + if (!permissionCheck.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', - message: authorization.reason || 'Insufficient permissions', + message: permissionCheck.reason || 'Insufficient permissions', operation: VARIABLE_OPERATIONS.UPDATE, target: 'variable', }) if (operationId) { socket.emit('operation-failed', { operationId, - error: authorization.reason || 'Insufficient permissions', + error: permissionCheck.reason || 'Insufficient permissions', retryable: false, }) } diff --git a/apps/realtime/src/handlers/workflow.ts b/apps/realtime/src/handlers/workflow.ts index 05ea8b8f41e..da977fcdb5e 100644 --- a/apps/realtime/src/handlers/workflow.ts +++ b/apps/realtime/src/handlers/workflow.ts @@ -160,7 +160,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: joinedAt: Date.now(), lastActivity: Date.now(), role: userRole, - roleCheckedAt: Date.now(), avatarUrl, } diff --git a/apps/realtime/src/index.test.ts b/apps/realtime/src/index.test.ts index 588ed7e0e47..f2d6d8e6868 100644 --- a/apps/realtime/src/index.test.ts +++ b/apps/realtime/src/index.test.ts @@ -73,11 +73,6 @@ vi.mock('@/middleware/permissions', () => ({ checkRolePermission: vi.fn().mockReturnValue({ allowed: true, }), - authorizeSocketOperation: vi.fn().mockResolvedValue({ - allowed: true, - role: 'admin', - accessRevoked: false, - }), })) vi.mock('@/database/operations', () => ({ diff --git a/apps/realtime/src/middleware/authorize-socket-operation.test.ts b/apps/realtime/src/middleware/authorize-socket-operation.test.ts deleted file mode 100644 index 9b16783e0c6..00000000000 --- a/apps/realtime/src/middleware/authorize-socket-operation.test.ts +++ /dev/null @@ -1,248 +0,0 @@ -/** - * @vitest-environment node - * - * Tests for `authorizeSocketOperation` — the per-event role re-validation that - * bounds how long a revoked or downgraded collaborator can keep mutating a - * workflow on an already-connected socket. - */ -import { beforeEach, describe, expect, it, vi } from 'vitest' - -const { mockLimit, mockAuthorize } = vi.hoisted(() => ({ - mockLimit: vi.fn(), - mockAuthorize: vi.fn(), -})) - -vi.mock('@sim/db', () => ({ - db: { - select: () => ({ - from: () => ({ - where: () => ({ - limit: mockLimit, - }), - }), - }), - }, -})) - -vi.mock('@sim/db/schema', () => ({ - workflow: {}, -})) - -vi.mock('drizzle-orm', () => ({ - and: vi.fn(() => ({})), - eq: vi.fn(() => ({})), - isNull: vi.fn(() => ({})), -})) - -vi.mock('@sim/workflow-authz', () => ({ - authorizeWorkflowByWorkspacePermission: mockAuthorize, -})) - -vi.mock('@sim/logger', () => ({ - createLogger: () => ({ - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - debug: vi.fn(), - }), -})) - -import { authorizeSocketOperation } from '@/middleware/permissions' -import type { UserPresence } from '@/rooms/types' - -const WRITE_OP = 'add' // a write/admin operation (not in READ_OPERATIONS) -const READ_OP = 'update-position' // allowed for read role too - -function createManager() { - return { - updateUserRole: vi.fn().mockResolvedValue(undefined), - } -} - -function createPresence(overrides?: Partial): UserPresence { - return { - userId: 'user-1', - workflowId: 'workflow-1', - userName: 'Test User', - socketId: 'socket-1', - joinedAt: 1, - lastActivity: 1, - role: 'write', - roleCheckedAt: Date.now(), - ...overrides, - } -} - -/** Configures the live-permission lookup to grant `role`, or deny when null. */ -function grant(role: string | null) { - mockLimit.mockResolvedValue([{ workspaceId: 'ws-1', name: 'WF' }]) - if (role === null) { - mockAuthorize.mockResolvedValue({ allowed: false, message: 'denied' }) - } else { - mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: role }) - } -} - -describe('authorizeSocketOperation', () => { - beforeEach(() => { - vi.clearAllMocks() - }) - - it('uses the cached role without a DB read while the role is fresh', async () => { - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result).toEqual({ - allowed: true, - role: 'write', - reason: undefined, - accessRevoked: false, - }) - expect(mockAuthorize).not.toHaveBeenCalled() - expect(mockLimit).not.toHaveBeenCalled() - expect(manager.updateUserRole).not.toHaveBeenCalled() - }) - - it('re-validates against the DB once the cached role is stale', async () => { - grant('write') - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.allowed).toBe(true) - expect(result.accessRevoked).toBe(false) - expect(mockAuthorize).toHaveBeenCalledTimes(1) - expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'write') - }) - - it('treats missing roleCheckedAt (pre-upgrade presence) as stale and re-validates', async () => { - grant('write') - const manager = createManager() - const presence = createPresence({ role: 'write' }) - presence.roleCheckedAt = undefined - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.allowed).toBe(true) - expect(mockAuthorize).toHaveBeenCalledTimes(1) - }) - - it('denies a write after a downgrade to read and refreshes the cached role', async () => { - grant('read') - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.allowed).toBe(false) - expect(result.accessRevoked).toBe(false) - expect(result.role).toBe('read') - expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'read') - }) - - it('still allows a read-tier op after a downgrade to read', async () => { - grant('read') - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: READ_OP, - }) - - expect(result.allowed).toBe(true) - expect(result.role).toBe('read') - }) - - it('reports accessRevoked when workspace permission has been removed', async () => { - grant(null) - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.accessRevoked).toBe(true) - expect(result.allowed).toBe(false) - expect(manager.updateUserRole).not.toHaveBeenCalled() - }) - - it('reports accessRevoked when the workflow no longer exists', async () => { - mockLimit.mockResolvedValue([]) - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.accessRevoked).toBe(true) - expect(mockAuthorize).not.toHaveBeenCalled() - }) - - it('falls back to the cached role on a transient DB error (no lockout)', async () => { - mockLimit.mockRejectedValue(new Error('connection reset')) - const manager = createManager() - const presence = createPresence({ role: 'write', roleCheckedAt: Date.now() - 60_000 }) - - const result = await authorizeSocketOperation({ - roomManager: manager as never, - workflowId: 'workflow-1', - socketId: 'socket-1', - userId: 'user-1', - presence, - operation: WRITE_OP, - }) - - expect(result.allowed).toBe(true) - expect(result.accessRevoked).toBe(false) - expect(result.role).toBe('write') - expect(manager.updateUserRole).not.toHaveBeenCalled() - }) -}) diff --git a/apps/realtime/src/middleware/permissions.ts b/apps/realtime/src/middleware/permissions.ts index 25435b69a5c..26a4d8c8542 100644 --- a/apps/realtime/src/middleware/permissions.ts +++ b/apps/realtime/src/middleware/permissions.ts @@ -13,17 +13,9 @@ import { } from '@sim/realtime-protocol/constants' import { authorizeWorkflowByWorkspacePermission } from '@sim/workflow-authz' import { and, eq, isNull } from 'drizzle-orm' -import type { IRoomManager, UserPresence } from '@/rooms/types' const logger = createLogger('SocketPermissions') -/** - * How long a cached role is trusted before it must be re-verified against the - * live `permissions` table. Bounds the window in which a revoked or downgraded - * collaborator can keep acting on a stale role on an already-connected socket. - */ -const ROLE_REVALIDATION_TTL_MS = 15_000 - // Admin-only operations (require admin role) const ADMIN_ONLY_OPERATIONS: string[] = [BLOCKS_OPERATIONS.BATCH_TOGGLE_LOCKED] @@ -91,63 +83,6 @@ export function checkRolePermission( return { allowed: true } } -/** - * Authorizes a mutating socket operation against the caller's *current* role. - * - * The role cached in presence is trusted only for `ROLE_REVALIDATION_TTL_MS`; - * once stale it is re-verified against the live `permissions` table and the - * refreshed role is written back to presence. This bounds how long a revoked or - * downgraded collaborator can keep mutating a workflow on an already-connected - * socket, complementing the push-based eviction triggered by the main app. - * - * Transient database failures during re-validation fall back to the last known - * role (without refreshing the timestamp, so the next operation retries) rather - * than locking out legitimate users during a blip. - */ -export async function authorizeSocketOperation(params: { - roomManager: IRoomManager - workflowId: string - socketId: string - userId: string - presence: UserPresence - operation: string -}): Promise<{ allowed: boolean; role: string; reason?: string; accessRevoked: boolean }> { - const { roomManager, workflowId, socketId, userId, presence, operation } = params - - let role = presence.role - const lastChecked = presence.roleCheckedAt ?? 0 - const isStale = Date.now() - lastChecked >= ROLE_REVALIDATION_TTL_MS - - if (isStale) { - try { - const access = await verifyWorkflowAccess(userId, workflowId) - if (!access.hasAccess) { - return { - allowed: false, - role, - reason: 'Access to this workflow has been revoked', - accessRevoked: true, - } - } - role = access.role || 'read' - await roomManager.updateUserRole(workflowId, socketId, role) - } catch (error) { - logger.warn( - `Failed to re-validate role for user ${userId} on workflow ${workflowId}; reusing cached role`, - error - ) - } - } - - const permissionCheck = checkRolePermission(role, operation) - return { - allowed: permissionCheck.allowed, - role, - reason: permissionCheck.reason, - accessRevoked: false, - } -} - /** * Verifies a user's access to a workflow via workspace permissions. * diff --git a/apps/realtime/src/rooms/access.test.ts b/apps/realtime/src/rooms/access.test.ts deleted file mode 100644 index 9b79cf9e8e8..00000000000 --- a/apps/realtime/src/rooms/access.test.ts +++ /dev/null @@ -1,162 +0,0 @@ -/** - * @vitest-environment node - * - * Tests for `reconcileWorkspaceAccessChange` — the push-driven reconciliation - * that evicts users whose workspace access was revoked and refreshes the cached - * role of users who were merely downgraded. - */ -import { beforeEach, describe, expect, it, vi } from 'vitest' - -const { mockWhere, mockVerifyWorkflowAccess } = vi.hoisted(() => ({ - mockWhere: vi.fn(), - mockVerifyWorkflowAccess: vi.fn(), -})) - -vi.mock('@sim/db', () => ({ - db: { - select: () => ({ - from: () => ({ - where: mockWhere, - }), - }), - }, -})) - -vi.mock('@sim/db/schema', () => ({ - workflow: {}, -})) - -vi.mock('drizzle-orm', () => ({ - and: vi.fn(() => ({})), - eq: vi.fn(() => ({})), - isNull: vi.fn(() => ({})), -})) - -vi.mock('@sim/logger', () => ({ - createLogger: () => ({ - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - debug: vi.fn(), - }), -})) - -vi.mock('@/middleware/permissions', () => ({ - verifyWorkflowAccess: mockVerifyWorkflowAccess, -})) - -import { reconcileWorkspaceAccessChange } from '@/rooms/access' -import type { UserPresence } from '@/rooms/types' - -function presence(socketId: string, userId: string, role = 'write'): UserPresence { - return { - userId, - workflowId: 'workflow-1', - userName: userId, - socketId, - joinedAt: 1, - lastActivity: 1, - role, - } -} - -function createManager(users: UserPresence[]) { - const socketsLeave = vi.fn().mockResolvedValue(undefined) - const emit = vi.fn() - return { - socketsLeave, - emit, - hasWorkflowRoom: vi.fn().mockResolvedValue(true), - getWorkflowUsers: vi.fn().mockResolvedValue(users), - updateUserRole: vi.fn().mockResolvedValue(undefined), - removeUserFromRoom: vi.fn().mockResolvedValue('workflow-1'), - broadcastPresenceUpdate: vi.fn().mockResolvedValue(undefined), - io: { - to: vi.fn(() => ({ emit })), - in: vi.fn(() => ({ socketsLeave })), - }, - } -} - -describe('reconcileWorkspaceAccessChange', () => { - beforeEach(() => { - vi.clearAllMocks() - mockWhere.mockResolvedValue([{ id: 'workflow-1' }]) - }) - - it('evicts every socket of a user whose access was revoked', async () => { - mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: false }) - const manager = createManager([presence('socket-1', 'user-1'), presence('socket-2', 'user-1')]) - - await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - - expect(manager.io.to).toHaveBeenCalledWith('socket-1') - expect(manager.io.to).toHaveBeenCalledWith('socket-2') - expect(manager.emit).toHaveBeenCalledWith( - 'workflow-permissions-revoked', - expect.objectContaining({ workflowId: 'workflow-1' }) - ) - expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-1', 'workflow-1') - expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-2', 'workflow-1') - expect(manager.socketsLeave).toHaveBeenCalledTimes(2) - expect(manager.updateUserRole).not.toHaveBeenCalled() - expect(manager.broadcastPresenceUpdate).toHaveBeenCalledWith('workflow-1') - }) - - it('refreshes the cached role without eviction on a downgrade', async () => { - mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true, role: 'read' }) - const manager = createManager([presence('socket-1', 'user-1', 'write')]) - - await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - - expect(manager.updateUserRole).toHaveBeenCalledWith('workflow-1', 'socket-1', 'read') - expect(manager.removeUserFromRoom).not.toHaveBeenCalled() - expect(manager.socketsLeave).not.toHaveBeenCalled() - expect(manager.emit).not.toHaveBeenCalled() - expect(manager.broadcastPresenceUpdate).toHaveBeenCalledWith('workflow-1') - }) - - it('does nothing for rooms where the user is not present', async () => { - const manager = createManager([presence('socket-9', 'someone-else')]) - - await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - - expect(mockVerifyWorkflowAccess).not.toHaveBeenCalled() - expect(manager.updateUserRole).not.toHaveBeenCalled() - expect(manager.removeUserFromRoom).not.toHaveBeenCalled() - }) - - it('skips workflows that have no active room', async () => { - const manager = createManager([presence('socket-1', 'user-1')]) - manager.hasWorkflowRoom.mockResolvedValue(false) - - await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - - expect(manager.getWorkflowUsers).not.toHaveBeenCalled() - expect(mockVerifyWorkflowAccess).not.toHaveBeenCalled() - }) - - it('continues to other workflows when one room fails', async () => { - mockWhere.mockResolvedValue([{ id: 'workflow-1' }, { id: 'workflow-2' }]) - mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: false }) - const manager = createManager([presence('socket-1', 'user-1')]) - manager.getWorkflowUsers - .mockRejectedValueOnce(new Error('redis blip')) - .mockResolvedValueOnce([presence('socket-1', 'user-1')]) - - await reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - - // First workflow threw; second still evicted. - expect(manager.removeUserFromRoom).toHaveBeenCalledWith('socket-1', 'workflow-2') - }) - - it('returns without throwing when the workspace workflow lookup fails', async () => { - mockWhere.mockRejectedValue(new Error('db down')) - const manager = createManager([presence('socket-1', 'user-1')]) - - await expect( - reconcileWorkspaceAccessChange(manager as never, 'ws-1', 'user-1') - ).resolves.toBeUndefined() - expect(manager.hasWorkflowRoom).not.toHaveBeenCalled() - }) -}) diff --git a/apps/realtime/src/rooms/access.ts b/apps/realtime/src/rooms/access.ts deleted file mode 100644 index b7ae62dd83d..00000000000 --- a/apps/realtime/src/rooms/access.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { db } from '@sim/db' -import { workflow } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull } from 'drizzle-orm' -import { verifyWorkflowAccess } from '@/middleware/permissions' -import type { IRoomManager } from '@/rooms/types' - -const logger = createLogger('RoomAccess') - -/** - * Reconciles active realtime rooms with a user's current workspace permission. - * - * For every non-archived workflow in the workspace that has an active room - * containing the user, the user's access is re-verified against the live - * `permissions` table: - * - If access was fully revoked, every one of the user's sockets is evicted from - * the room (works cross-pod via the Redis adapter) so they immediately stop - * receiving and persisting edits. - * - If access was merely downgraded, the cached role is refreshed in place so - * subsequent operations are authorized against the new role without waiting for - * the per-presence revalidation TTL to elapse. - */ -export async function reconcileWorkspaceAccessChange( - manager: IRoomManager, - workspaceId: string, - userId: string -): Promise { - let workflows: { id: string }[] - try { - workflows = await db - .select({ id: workflow.id }) - .from(workflow) - .where(and(eq(workflow.workspaceId, workspaceId), isNull(workflow.archivedAt))) - } catch (error) { - logger.error(`Failed to load workflows for workspace ${workspaceId} access change`, error) - return - } - - for (const { id: workflowId } of workflows) { - try { - const hasRoom = await manager.hasWorkflowRoom(workflowId) - if (!hasRoom) continue - - const users = await manager.getWorkflowUsers(workflowId) - const targets = users.filter((u) => u.userId === userId) - if (targets.length === 0) continue - - const access = await verifyWorkflowAccess(userId, workflowId) - - if (access.hasAccess) { - const role = access.role || 'read' - for (const target of targets) { - await manager.updateUserRole(workflowId, target.socketId, role) - } - await manager.broadcastPresenceUpdate(workflowId) - logger.info( - `Refreshed cached role to '${role}' for user ${userId} on workflow ${workflowId}` - ) - continue - } - - for (const target of targets) { - manager.io.to(target.socketId).emit('workflow-permissions-revoked', { - workflowId, - message: 'Your access to this workflow has been revoked', - timestamp: Date.now(), - }) - await manager.removeUserFromRoom(target.socketId, workflowId) - await manager.io.in(target.socketId).socketsLeave(workflowId) - } - await manager.broadcastPresenceUpdate(workflowId) - logger.info( - `Evicted ${targets.length} socket(s) for revoked user ${userId} from workflow ${workflowId}` - ) - } catch (error) { - logger.error(`Failed to reconcile access for user ${userId} on workflow ${workflowId}`, error) - } - } -} diff --git a/apps/realtime/src/rooms/memory-manager.ts b/apps/realtime/src/rooms/memory-manager.ts index e837657fd1c..a032e785bb5 100644 --- a/apps/realtime/src/rooms/memory-manager.ts +++ b/apps/realtime/src/rooms/memory-manager.ts @@ -1,6 +1,5 @@ import { createLogger } from '@sim/logger' import type { Server } from 'socket.io' -import { reconcileWorkspaceAccessChange } from '@/rooms/access' import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/rooms/types' const logger = createLogger('MemoryRoomManager') @@ -127,17 +126,6 @@ export class MemoryRoomManager implements IRoomManager { } } - async updateUserRole(workflowId: string, socketId: string, role: string): Promise { - const room = this.workflowRooms.get(workflowId) - if (!room) return - - const presence = room.users.get(socketId) - if (presence) { - presence.role = role - presence.roleCheckedAt = Date.now() - } - } - async updateRoomLastModified(workflowId: string): Promise { const room = this.workflowRooms.get(workflowId) if (room) { @@ -267,9 +255,4 @@ export class MemoryRoomManager implements IRoomManager { logger.info(`Notified ${room.users.size} users about workflow deployment change: ${workflowId}`) } - - async handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise { - logger.info(`Handling workspace access change for user ${userId} in workspace ${workspaceId}`) - await reconcileWorkspaceAccessChange(this, workspaceId, userId) - } } diff --git a/apps/realtime/src/rooms/redis-manager.ts b/apps/realtime/src/rooms/redis-manager.ts index 653c0b467de..0e6b3eadf2b 100644 --- a/apps/realtime/src/rooms/redis-manager.ts +++ b/apps/realtime/src/rooms/redis-manager.ts @@ -1,7 +1,6 @@ import { createLogger } from '@sim/logger' import { createClient, type RedisClientType } from 'redis' import type { Server } from 'socket.io' -import { reconcileWorkspaceAccessChange } from '@/rooms/access' import type { IRoomManager, UserPresence, UserSession } from '@/rooms/types' const logger = createLogger('RedisRoomManager') @@ -97,30 +96,6 @@ redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl) return 1 ` -/** - * Lua script for atomically updating a user's cached role and verification - * timestamp. Read-modify-write under a single script so a concurrent activity - * update on the same presence field cannot clobber the refreshed role. - */ -const UPDATE_ROLE_SCRIPT = ` -local workflowUsersKey = KEYS[1] -local socketId = ARGV[1] -local role = ARGV[2] -local roleCheckedAt = ARGV[3] - -local existingJson = redis.call('HGET', workflowUsersKey, socketId) -if not existingJson then - return 0 -end - -local existing = cjson.decode(existingJson) -existing.role = role -existing.roleCheckedAt = tonumber(roleCheckedAt) - -redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) -return 1 -` - /** * Redis-backed room manager for multi-pod deployments. * Uses Lua scripts for atomic operations to prevent race conditions. @@ -131,7 +106,6 @@ export class RedisRoomManager implements IRoomManager { private isConnected = false private removeUserScriptSha: string | null = null private updateActivityScriptSha: string | null = null - private updateRoleScriptSha: string | null = null constructor(io: Server, redisUrl: string) { this._io = io @@ -177,7 +151,6 @@ export class RedisRoomManager implements IRoomManager { // Pre-load Lua scripts for better performance this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT) - this.updateRoleScriptSha = await this.redis.scriptLoad(UPDATE_ROLE_SCRIPT) logger.info('RedisRoomManager connected to Redis and scripts loaded') } catch (error) { @@ -358,32 +331,6 @@ export class RedisRoomManager implements IRoomManager { } } - async updateUserRole( - workflowId: string, - socketId: string, - role: string, - retried = false - ): Promise { - if (!this.updateRoleScriptSha) { - logger.error('updateUserRole called before initialize()') - return - } - - try { - await this.redis.evalSha(this.updateRoleScriptSha, { - keys: [KEYS.workflowUsers(workflowId)], - arguments: [socketId, role, Date.now().toString()], - }) - } catch (error) { - if ((error as Error).message?.includes('NOSCRIPT') && !retried) { - logger.warn('Lua script not found, reloading...') - this.updateRoleScriptSha = await this.redis.scriptLoad(UPDATE_ROLE_SCRIPT) - return this.updateUserRole(workflowId, socketId, role, true) - } - logger.error(`Failed to update user role: ${socketId}`, error) - } - } - async updateRoomLastModified(workflowId: string): Promise { await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) } @@ -510,9 +457,4 @@ export class RedisRoomManager implements IRoomManager { const userCount = await this.getUniqueUserCount(workflowId) logger.info(`Notified ${userCount} users about workflow deployment change: ${workflowId}`) } - - async handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise { - logger.info(`Handling workspace access change for user ${userId} in workspace ${workspaceId}`) - await reconcileWorkspaceAccessChange(this, workspaceId, userId) - } } diff --git a/apps/realtime/src/rooms/types.ts b/apps/realtime/src/rooms/types.ts index b9da9cf7e49..9553a427e1e 100644 --- a/apps/realtime/src/rooms/types.ts +++ b/apps/realtime/src/rooms/types.ts @@ -12,12 +12,6 @@ export interface UserPresence { joinedAt: number lastActivity: number role: string - /** - * Timestamp (ms) of the last time `role` was verified against the live - * `permissions` table. Used to bound how long a revoked or downgraded - * collaborator can keep acting on a stale, cached role. - */ - roleCheckedAt?: number cursor?: { x: number; y: number } selection?: { type: 'block' | 'edge' | 'none'; id?: string } avatarUrl?: string | null @@ -105,12 +99,6 @@ export interface IRoomManager { updates: Partial> ): Promise - /** - * Update a user's cached workspace role and refresh its verification - * timestamp. Called after re-validating access against the database. - */ - updateUserRole(workflowId: string, socketId: string, role: string): Promise - /** * Update room's lastModified timestamp */ @@ -155,11 +143,4 @@ export interface IRoomManager { * Handle workflow deployment change - notify users to refresh deployment state */ handleWorkflowDeployed(workflowId: string): Promise - - /** - * Handle a workspace permission change for a user. Evicts the user from any - * active workflow rooms in the workspace where their access has been revoked, - * and refreshes their cached role where access was merely downgraded. - */ - handleWorkspaceAccessChange(workspaceId: string, userId: string): Promise } diff --git a/apps/realtime/src/routes/http.ts b/apps/realtime/src/routes/http.ts index 3ecfcbd6650..0f8ed73cc52 100644 --- a/apps/realtime/src/routes/http.ts +++ b/apps/realtime/src/routes/http.ts @@ -150,25 +150,6 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { return } - // Handle workspace permission change notifications from the main API. - // Evicts users whose access was revoked and refreshes downgraded roles. - if (req.method === 'POST' && req.url === '/api/permissions-updated') { - try { - const body = await readRequestBody(req) - const { workspaceId, userId } = JSON.parse(body) - if (typeof workspaceId !== 'string' || typeof userId !== 'string') { - sendError(res, 'workspaceId and userId are required', 400) - return - } - await roomManager.handleWorkspaceAccessChange(workspaceId, userId) - sendSuccess(res) - } catch (error) { - logger.error('Error handling permissions update notification:', error) - sendError(res, 'Failed to process permissions update notification') - } - return - } - res.writeHead(404, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: 'Not found' })) } diff --git a/apps/realtime/src/routes/permissions-updated.test.ts b/apps/realtime/src/routes/permissions-updated.test.ts deleted file mode 100644 index 5ecd8751b30..00000000000 --- a/apps/realtime/src/routes/permissions-updated.test.ts +++ /dev/null @@ -1,115 +0,0 @@ -/** - * @vitest-environment node - * - * Tests for the `/api/permissions-updated` HTTP endpoint that the main app calls - * to reconcile realtime rooms after a workspace permission change. - */ -import type { IncomingMessage, ServerResponse } from 'node:http' -import { Readable } from 'node:stream' -import { beforeEach, describe, expect, it, vi } from 'vitest' -import { createHttpHandler } from '@/routes/http' - -const API_KEY = 'test-internal-api-secret-at-least-32-chars' - -function createLogger() { - return { info: vi.fn(), error: vi.fn(), debug: vi.fn(), warn: vi.fn() } -} - -function createRoomManager(overrides?: Record) { - return { - isReady: vi.fn().mockReturnValue(true), - handleWorkspaceAccessChange: vi.fn().mockResolvedValue(undefined), - ...overrides, - } -} - -function createRequest(body: unknown, headers: Record = {}): IncomingMessage { - const req = Readable.from([ - typeof body === 'string' ? body : JSON.stringify(body), - ]) as unknown as IncomingMessage - req.method = 'POST' - req.url = '/api/permissions-updated' - req.headers = headers - return req -} - -function createResponse() { - const res = { - statusCode: 0, - body: '', - writeHead: vi.fn((status: number) => { - res.statusCode = status - return res - }), - end: vi.fn((chunk?: string) => { - if (chunk) res.body = chunk - }), - } - return res as unknown as ServerResponse & { statusCode: number; body: string } -} - -async function invoke(roomManager: ReturnType, req: IncomingMessage) { - const res = createResponse() - const handler = createHttpHandler(roomManager as never, createLogger()) - await handler(req, res) - return res -} - -describe('POST /api/permissions-updated', () => { - beforeEach(() => { - vi.clearAllMocks() - }) - - it('rejects requests without a valid internal API key', async () => { - const roomManager = createRoomManager() - const res = await invoke(roomManager, createRequest({ workspaceId: 'ws-1', userId: 'user-1' })) - - expect(res.statusCode).toBe(401) - expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() - }) - - it('returns 400 when workspaceId or userId is missing', async () => { - const roomManager = createRoomManager() - const res = await invoke( - roomManager, - createRequest({ workspaceId: 'ws-1' }, { 'x-api-key': API_KEY }) - ) - - expect(res.statusCode).toBe(400) - expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() - }) - - it('delegates to handleWorkspaceAccessChange on a valid request', async () => { - const roomManager = createRoomManager() - const res = await invoke( - roomManager, - createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) - ) - - expect(res.statusCode).toBe(200) - expect(roomManager.handleWorkspaceAccessChange).toHaveBeenCalledWith('ws-1', 'user-1') - }) - - it('returns 503 when the room manager is not ready', async () => { - const roomManager = createRoomManager({ isReady: vi.fn().mockReturnValue(false) }) - const res = await invoke( - roomManager, - createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) - ) - - expect(res.statusCode).toBe(503) - expect(roomManager.handleWorkspaceAccessChange).not.toHaveBeenCalled() - }) - - it('returns 500 when reconciliation throws', async () => { - const roomManager = createRoomManager({ - handleWorkspaceAccessChange: vi.fn().mockRejectedValue(new Error('boom')), - }) - const res = await invoke( - roomManager, - createRequest({ workspaceId: 'ws-1', userId: 'user-1' }, { 'x-api-key': API_KEY }) - ) - - expect(res.statusCode).toBe(500) - }) -}) diff --git a/apps/sim/app/api/workspaces/[id]/permissions/route.ts b/apps/sim/app/api/workspaces/[id]/permissions/route.ts index bf4d13856d7..8e9bbd5bf32 100644 --- a/apps/sim/app/api/workspaces/[id]/permissions/route.ts +++ b/apps/sim/app/api/workspaces/[id]/permissions/route.ts @@ -11,7 +11,6 @@ import { getSession } from '@/lib/auth' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment' import { captureServerEvent } from '@/lib/posthog/server' -import { notifyWorkspaceAccessChanged } from '@/lib/workspaces/permissions/realtime' import { checkWorkspaceAccess, getUserEntityPermissions, @@ -197,14 +196,6 @@ export const PATCH = withRouteHandler( }) } - // Reconcile active realtime rooms so downgraded collaborators lose write - // access promptly instead of riding their cached role until disconnect. - await Promise.all( - body.updates - .filter((update) => permLookup.get(update.userId)?.permission !== update.permissions) - .map((update) => notifyWorkspaceAccessChanged(workspaceId, update.userId)) - ) - const updatedUsers = await getUsersWithPermissions(workspaceId) for (const update of body.updates) { diff --git a/apps/sim/app/api/workspaces/members/[id]/route.ts b/apps/sim/app/api/workspaces/members/[id]/route.ts index ce18f3f306d..8679aea74c5 100644 --- a/apps/sim/app/api/workspaces/members/[id]/route.ts +++ b/apps/sim/app/api/workspaces/members/[id]/route.ts @@ -12,7 +12,6 @@ import { reconcileOrganizationSeats } from '@/lib/billing/organizations/seats' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { revokeWorkspaceCredentialMembershipsTx } from '@/lib/credentials/access' import { captureServerEvent } from '@/lib/posthog/server' -import { notifyWorkspaceAccessChanged } from '@/lib/workspaces/permissions/realtime' import { hasWorkspaceAdminAccess } from '@/lib/workspaces/permissions/utils' import { reassignWorkflowOwnershipForWorkspaceMemberRemovalTx, @@ -156,10 +155,6 @@ export const DELETE = withRouteHandler( } ) - // Evict the removed user from any active realtime workflow rooms so their - // live read/write access ends immediately, not just on the next REST call. - await notifyWorkspaceAccessChanged(workspaceId, userId) - /** * Seats are tied to organization membership (one per member), so a * single-workspace removal only drops a seat when it leaves the member diff --git a/apps/sim/lib/workspaces/permissions/realtime.ts b/apps/sim/lib/workspaces/permissions/realtime.ts deleted file mode 100644 index 60c2d7fc8fc..00000000000 --- a/apps/sim/lib/workspaces/permissions/realtime.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { createLogger } from '@sim/logger' -import { env } from '@/lib/core/config/env' -import { getSocketServerUrl } from '@/lib/core/utils/urls' - -const logger = createLogger('WorkspacePermissionsRealtime') - -/** - * Notifies the realtime server that a user's workspace permission changed so it - * can reconcile any active workflow rooms — evicting the user where access was - * revoked and refreshing their cached role where it was downgraded. - * - * Best-effort: failures are logged but never block the permission mutation, and - * the realtime server independently re-validates cached roles on a short TTL. - */ -export async function notifyWorkspaceAccessChanged( - workspaceId: string, - userId: string -): Promise { - try { - const response = await fetch(`${getSocketServerUrl()}/api/permissions-updated`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': env.INTERNAL_API_SECRET, - }, - body: JSON.stringify({ workspaceId, userId }), - }) - - if (!response.ok) { - logger.warn( - `Failed to notify realtime of access change for user ${userId} in workspace ${workspaceId} (${response.status})` - ) - } - } catch (error) { - logger.warn( - `Error notifying realtime of access change for user ${userId} in workspace ${workspaceId}`, - { error } - ) - } -}