Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions apps/realtime/src/handlers/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
import { ZodError } from 'zod'
import { persistWorkflowOperation } from '@/database/operations'
import type { AuthenticatedSocket } from '@/middleware/auth'
import { checkRolePermission } from '@/middleware/permissions'
import { checkWorkflowOperationPermission } from '@/middleware/permissions'
import type { IRoomManager, UserSession } from '@/rooms'

const logger = createLogger('OperationsHandlers')
Expand Down Expand Up @@ -125,11 +125,17 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager

await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })

// Check permissions using cached role (no DB query)
const permissionCheck = checkRolePermission(userPresence.role, operation)
// Re-validate the workspace role against the DB (cached per pod for a short
// window) so revoked or downgraded collaborators lose write access live.
const permissionCheck = await checkWorkflowOperationPermission(
session.userId,
workflowId,
operation,
userPresence.role
)
if (!permissionCheck.allowed) {
logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
`User ${session.userId} (role: ${permissionCheck.role ?? 'none'}) forbidden from ${operation} on ${target}`
)
emitOperationError({
type: 'INSUFFICIENT_PERMISSIONS',
Expand Down
9 changes: 7 additions & 2 deletions apps/realtime/src/handlers/subblocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow'
import { and, eq } from 'drizzle-orm'
import type { AuthenticatedSocket } from '@/middleware/auth'
import { checkRolePermission } from '@/middleware/permissions'
import { checkWorkflowOperationPermission } from '@/middleware/permissions'
import type { IRoomManager } from '@/rooms'

const logger = createLogger('SubblocksHandlers')
Expand Down Expand Up @@ -136,7 +136,12 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
return
}

const permissionCheck = checkRolePermission(userPresence.role, SUBBLOCK_OPERATIONS.UPDATE)
const permissionCheck = await checkWorkflowOperationPermission(
session.userId,
workflowId,
SUBBLOCK_OPERATIONS.UPDATE,
userPresence.role
)
if (!permissionCheck.allowed) {
socket.emit('operation-forbidden', {
type: 'INSUFFICIENT_PERMISSIONS',
Expand Down
9 changes: 7 additions & 2 deletions apps/realtime/src/handlers/variables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getErrorMessage } from '@sim/utils/errors'
import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
import { eq } from 'drizzle-orm'
import type { AuthenticatedSocket } from '@/middleware/auth'
import { checkRolePermission } from '@/middleware/permissions'
import { checkWorkflowOperationPermission } from '@/middleware/permissions'
import type { IRoomManager } from '@/rooms'

const logger = createLogger('VariablesHandlers')
Expand Down Expand Up @@ -124,7 +124,12 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
return
}

const permissionCheck = checkRolePermission(userPresence.role, VARIABLE_OPERATIONS.UPDATE)
const permissionCheck = await checkWorkflowOperationPermission(
session.userId,
workflowId,
VARIABLE_OPERATIONS.UPDATE,
userPresence.role
)
if (!permissionCheck.allowed) {
socket.emit('operation-forbidden', {
type: 'INSUFFICIENT_PERMISSIONS',
Expand Down
4 changes: 4 additions & 0 deletions apps/realtime/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ vi.mock('@/middleware/permissions', () => ({
checkRolePermission: vi.fn().mockReturnValue({
allowed: true,
}),
checkWorkflowOperationPermission: vi.fn().mockResolvedValue({
allowed: true,
role: 'admin',
}),
}))

vi.mock('@/database/operations', () => ({
Expand Down
139 changes: 137 additions & 2 deletions apps/realtime/src/middleware/permissions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@ import {
ROLE_ALLOWED_OPERATIONS,
SOCKET_OPERATIONS,
} from '@sim/testing'
import { describe, expect, it } from 'vitest'
import { checkRolePermission } from '@/middleware/permissions'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockAuthorize } = vi.hoisted(() => ({
mockAuthorize: vi.fn(),
}))

vi.mock('@sim/workflow-authz', () => ({
authorizeWorkflowByWorkspacePermission: mockAuthorize,
}))

import { checkRolePermission, checkWorkflowOperationPermission } from '@/middleware/permissions'

describe('checkRolePermission', () => {
describe('admin role', () => {
Expand Down Expand Up @@ -279,3 +288,129 @@ describe('checkRolePermission', () => {
})
})
})

describe('checkWorkflowOperationPermission', () => {
const userId = 'user-1'
let workflowCounter = 0
let workflowId: string

beforeEach(() => {
vi.clearAllMocks()
// Unique workflowId per test so the module-level role cache never leaks across tests
workflowCounter += 1
workflowId = `wf-${workflowCounter}`
})

it('allows a write operation when the user still has write access', async () => {
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' })

const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')

expect(result.allowed).toBe(true)
expect(result.role).toBe('write')
})

it('denies all writes once workspace access has been revoked', async () => {
mockAuthorize.mockResolvedValue({ allowed: false, workspacePermission: null })

const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write')

expect(result.allowed).toBe(false)
expect(result.role).toBeNull()
expect(result.reason).toMatch(/revoked/i)
})

it('denies writes after a downgrade to read but still allows position updates', async () => {
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'read' })

const denied = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write')
expect(denied.allowed).toBe(false)
expect(denied.role).toBe('read')

const allowed = await checkWorkflowOperationPermission(
userId,
workflowId,
'update-position',
'write'
)
expect(allowed.allowed).toBe(true)
expect(allowed.role).toBe('read')
})

it('caches the role within the TTL to avoid a DB read on every operation', async () => {
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' })

await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')
await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')

expect(mockAuthorize).toHaveBeenCalledTimes(1)
})

it('re-reads the role after the cache TTL expires', async () => {
vi.useFakeTimers()
try {
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' })
Comment thread
icecrasher321 marked this conversation as resolved.
await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')

// Downgraded to read after the first check
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'read' })
vi.advanceTimersByTime(31_000)

const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write')
expect(mockAuthorize).toHaveBeenCalledTimes(2)
expect(result.allowed).toBe(false)
expect(result.role).toBe('read')
} finally {
vi.useRealTimers()
}
})

it('falls back to the join-time role on a transient DB error when nothing is cached yet', async () => {
mockAuthorize.mockRejectedValue(new Error('db unavailable'))

const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write')

expect(result.allowed).toBe(true)
expect(result.role).toBe('write')
})

it('preserves a recorded revocation through a later transient DB error', async () => {
vi.useFakeTimers()
try {
// First check records the revocation (null) in the cache
mockAuthorize.mockResolvedValue({ allowed: false, workspacePermission: null })
const first = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'admin')
expect(first.allowed).toBe(false)
expect(first.role).toBeNull()

// TTL expires, then the DB blips on the next re-validation. The stale join-time
// role ('admin') must NOT resurrect access — the recorded revocation wins.
vi.advanceTimersByTime(31_000)
mockAuthorize.mockRejectedValue(new Error('db unavailable'))

const second = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'admin')
expect(second.allowed).toBe(false)
expect(second.role).toBeNull()
} finally {
vi.useRealTimers()
}
})

it('uses the last cached role (not the join-time role) on a transient DB error', async () => {
vi.useFakeTimers()
try {
mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' })
await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')

vi.advanceTimersByTime(31_000)
mockAuthorize.mockRejectedValue(new Error('db unavailable'))

// fallbackRole is 'read', but the last recorded decision was 'write' — use that
const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read')
expect(result.allowed).toBe(true)
expect(result.role).toBe('write')
} finally {
vi.useRealTimers()
}
})
})
105 changes: 105 additions & 0 deletions apps/realtime/src/middleware/permissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,111 @@ export function checkRolePermission(
return { allowed: true }
}

/**
* TTL for the per-pod role cache backing live re-validation of mutating operations.
* Bounds how long a revoked or downgraded collaborator can retain write access on an
* already-connected socket.
*/
const ROLE_REVALIDATION_TTL_MS = 30_000

/** Soft cap on cached entries before an opportunistic purge of expired ones runs. */
const MAX_ROLE_CACHE_ENTRIES = 5_000

interface CachedRole {
/** Authoritative workspace role, or `null` when the user has no access. */
role: string | null
expiresAt: number
}

/**
* Per-pod cache of authoritative workspace roles, keyed by `${userId}:${workflowId}`.
*
* Socket connections are sticky to a single pod, so a socket's mutating operations are
* always gated by the same pod's cache. We rely on TTL expiry (not cross-pod
* invalidation) to bound stale authorization to {@link ROLE_REVALIDATION_TTL_MS}, which
* keeps this correct under a multi-pod deployment without any shared state.
*/
const roleCache = new Map<string, CachedRole>()

function purgeExpiredRoles(now: number): void {
for (const [key, entry] of roleCache) {
if (entry.expiresAt <= now) {
roleCache.delete(key)
}
}
}

/**
* Resolves a user's current workspace role for a workflow, re-reading the `permissions`
* table at most once per {@link ROLE_REVALIDATION_TTL_MS} per pod.
*
* Returns `null` when the user genuinely has no access (removed/revoked). On a transient
* DB failure it reuses the last recorded decision for this (user, workflow) — including a
* previously recorded revocation (`null`) — and only falls back to `fallbackRole` when no
* decision has been recorded yet, so a blip neither blocks legitimate editors nor
* resurrects already-revoked access.
*/
export async function resolveCurrentWorkflowRole(
userId: string,
workflowId: string,
fallbackRole: string
): Promise<string | null> {
const now = Date.now()
const key = `${userId}:${workflowId}`
const cached = roleCache.get(key)
if (cached && cached.expiresAt > now) {
return cached.role
}

try {
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: 'read',
})
const role = authorization.allowed ? (authorization.workspacePermission ?? null) : null
if (roleCache.size >= MAX_ROLE_CACHE_ENTRIES) {
purgeExpiredRoles(now)
}
roleCache.set(key, { role, expiresAt: now + ROLE_REVALIDATION_TTL_MS })
return role
} catch (error) {
logger.warn(
`Failed to re-validate role for user ${userId} on workflow ${workflowId}; using last known role`,
error
)
// Prefer the last recorded decision — even if expired, and even if it is `null` for an
// already-revoked user — so a recorded revocation survives a transient DB failure
// instead of reverting to the stale join-time role. Only trust `fallbackRole` when
// nothing has been recorded for this (user, workflow) yet.
const lastKnown = roleCache.get(key)
return lastKnown !== undefined ? lastKnown.role : fallbackRole
}
}

/**
* Live permission gate for mutating socket operations. Re-validates the user's workspace
* role against the database (cached per pod for {@link ROLE_REVALIDATION_TTL_MS}) so that
* revoked or downgraded collaborators lose write access on an open connection without
* needing to rejoin the workflow.
*/
export async function checkWorkflowOperationPermission(
userId: string,
workflowId: string,
operation: string,
fallbackRole: string
): Promise<{ allowed: boolean; reason?: string; role: string | null }> {
const role = await resolveCurrentWorkflowRole(userId, workflowId, fallbackRole)
if (!role) {
return {
allowed: false,
reason: 'Access to this workflow has been revoked',
role: null,
}
}
return { ...checkRolePermission(role, operation), role }
}

/**
* Verifies a user's access to a workflow via workspace permissions.
*
Expand Down
Loading