From f3b112819ec20b101e1fac7e6bdf11a90f349343 Mon Sep 17 00:00:00 2001 From: Jakub Zajac Date: Fri, 12 Dec 2025 00:39:00 +0000 Subject: [PATCH] Add subscription permissions rules --- src/routes/graphql/ws.ts | 136 +++++++++++++++++++-------------- src/subscriptionPermissions.ts | 81 ++++++++++++++++++++ 2 files changed, 160 insertions(+), 57 deletions(-) create mode 100644 src/subscriptionPermissions.ts diff --git a/src/routes/graphql/ws.ts b/src/routes/graphql/ws.ts index 5821553..bda8eea 100644 --- a/src/routes/graphql/ws.ts +++ b/src/routes/graphql/ws.ts @@ -3,72 +3,94 @@ import { Duplex } from 'stream'; import WebSocket from 'ws'; import { logger } from '~helpers'; +import { validateSubscription } from '../../subscriptionPermissions'; // The host of the AppSync API (non-realtime) const WS_HOST = new URL('/', process.env.APPSYNC_API).host; // In production the Amplify WebSocket API is secure and requires a different endpoint -const WS_ENDPOINT = process.env.NODE_ENV === 'dev' ? `ws://${WS_HOST}` : process.env.APPSYNC_WSS_API; +const WS_ENDPOINT = + process.env.NODE_ENV === 'dev' + ? `ws://${WS_HOST}` + : process.env.APPSYNC_WSS_API; const wss = new WebSocket.Server({ noServer: true }); // Custom websocker upgrade handler // This proxies websocket requests and adds the necessary headers for Amplify authorization if applicable -export const handleWsUpgrade = (req: InstanceType, socket: Duplex, head: Buffer) => { - // localhost is fine here, we're just using the path and the query string - const url = new URL(req.url || '', 'http://localhost'); - const authHeaders = { - host: WS_HOST, - // Creates a date in the format YYYYMMDDTHHMMSSZ - 'x-amz-date': new Date().toISOString().replace(/[\-:]/g,'').replace(/\.\d{3}/,''), - 'x-api-key': process.env.APPSYNC_API_KEY, - } - // Add "header" query string parameter (default is {}) - url.searchParams.set('header', btoa((JSON.stringify(authHeaders)))); - const proxyPath = `${url.pathname}?${url.searchParams.toString()}` - // Establish a websocket connection to Amplify - const targetWs = new WebSocket( - `${WS_ENDPOINT}${proxyPath}`, - req.headers['sec-websocket-protocol'] || 'graphql-ws', - ); - targetWs.on('open', () => { - wss.handleUpgrade(req, socket, head, (ws) => { - // Add authorization headers to incoming client messages - ws.on('message', (data) => { - let parsed; - try { - parsed = JSON.parse(data.toString()); - } catch (e) { - logger('Failed to parse websocket message', e); - return; - } - if (parsed.payload?.extensions?.authorization) { - parsed.payload.extensions.authorization = { - ...parsed.payload.extensions.authorization, - ...authHeaders, - }; - return targetWs.send(JSON.stringify(parsed)); - } - targetWs.send(data.toString()); - }); - ws.on('close', () => { - targetWs.close(); - }); - ws.on('error', (err) => { - logger('WebSocket error from client: ', err); - targetWs.close(); - }); +export const handleWsUpgrade = ( + req: InstanceType, + socket: Duplex, + head: Buffer, +) => { + // localhost is fine here, we're just using the path and the query string + const url = new URL(req.url || '', 'http://localhost'); + const authHeaders = { + host: WS_HOST, + // Creates a date in the format YYYYMMDDTHHMMSSZ + 'x-amz-date': new Date() + .toISOString() + .replace(/[\-:]/g, '') + .replace(/\.\d{3}/, ''), + 'x-api-key': process.env.APPSYNC_API_KEY, + }; + // Add "header" query string parameter (default is {}) + url.searchParams.set('header', btoa(JSON.stringify(authHeaders))); + const proxyPath = `${url.pathname}?${url.searchParams.toString()}`; + // Establish a websocket connection to Amplify + const targetWs = new WebSocket( + `${WS_ENDPOINT}${proxyPath}`, + req.headers['sec-websocket-protocol'] || 'graphql-ws', + ); + targetWs.on('open', () => { + wss.handleUpgrade(req, socket, head, (ws) => { + // Add authorization headers to incoming client messages + ws.on('message', (data) => { + let parsed; + try { + parsed = JSON.parse(data.toString()); + } catch (e) { + logger('Failed to parse websocket message', e); + return; + } - // Pass through messages from Amplify to the client - targetWs.on('message', (data) => { - ws.send(data.toString()); - }); - targetWs.on('close', () => { - ws.close(); - }); - targetWs.on('error', (err) => { - logger('WebSocket error from Amplify: ', err); - ws.close(); - }); - }); + // Validate subscription queries on start + if (parsed.type === 'start' && parsed.payload?.data) { + const parsedData = JSON.parse(parsed.payload.data); + if (!validateSubscription(parsedData.query)) { + ws.close(); + targetWs.close(); + return; + } + } + + if (parsed.payload?.extensions?.authorization) { + parsed.payload.extensions.authorization = { + ...parsed.payload.extensions.authorization, + ...authHeaders, + }; + return targetWs.send(JSON.stringify(parsed)); + } + targetWs.send(data.toString()); + }); + ws.on('close', () => { + targetWs.close(); + }); + ws.on('error', (err) => { + logger('WebSocket error from client: ', err); + targetWs.close(); + }); + + // Pass through messages from Amplify to the client + targetWs.on('message', (data) => { + ws.send(data.toString()); + }); + targetWs.on('close', () => { + ws.close(); + }); + targetWs.on('error', (err) => { + logger('WebSocket error from Amplify: ', err); + ws.close(); + }); }); + }); }; diff --git a/src/subscriptionPermissions.ts b/src/subscriptionPermissions.ts new file mode 100644 index 0000000..ec7f4eb --- /dev/null +++ b/src/subscriptionPermissions.ts @@ -0,0 +1,81 @@ +import { parse, visit, visitWithTypeInfo, TypeInfo } from 'graphql'; + +import { getSchema } from './schema'; +import { logger } from '~helpers'; + +const allowedSubscriptions = [ + 'onCreateColonyActionMetadata', + 'onUpdateColony', + 'onCreateColonyContributor', + 'onUpdateColonyContributor', +]; + +const blockedFields: Record = { + Profile: ['email'], + User: [ + 'bridgeCustomerId', + 'privateBetaInviteCode', + 'userPrivateBetaInviteCodeId', + ], + Colony: ['colonyMemberInvite', 'colonyMemberInviteCode'], +}; + +export const validateSubscription = (query: string): boolean => { + let document; + try { + document = parse(query); + } catch { + logger('Subscription rejected: Invalid query'); + return false; + } + + // Check if the subscription is allowed + for (const def of document.definitions) { + if (def.kind !== 'OperationDefinition') continue; + + if (def.operation !== 'subscription') { + logger('Subscription rejected: Non-subscription operation in document'); + return false; + } + + const firstField = def.selectionSet.selections[0]; + if (!firstField || firstField.kind !== 'Field') { + logger('Subscription rejected: No field selected'); + return false; + } + + const subscriptionName = firstField.name.value; + if (!allowedSubscriptions.includes(subscriptionName)) { + logger('Subscription rejected:', subscriptionName); + return false; + } + } + + // Check for blocked fields + const typeInfo = new TypeInfo(getSchema()); + let blocked = false; + + visit( + document, + visitWithTypeInfo(typeInfo, { + Field: { + enter(node) { + const parentType = typeInfo.getParentType(); + if (!parentType) return; + + const blockedList = blockedFields[parentType.name]; + if (blockedList?.includes(node.name.value)) { + logger( + 'Subscription rejected due to blocked field:', + `${parentType.name}.${node.name.value}`, + ); + blocked = true; + return false; + } + }, + }, + }), + ); + + return !blocked; +};