diff --git a/Tokenization/backend/proto/wrapper.proto b/Tokenization/backend/proto/wrapper.proto index cd74d8d7d..116131b80 100644 --- a/Tokenization/backend/proto/wrapper.proto +++ b/Tokenization/backend/proto/wrapper.proto @@ -25,6 +25,12 @@ service CentralSystem { rpc ClientStream(stream Payload) returns (stream Payload); } +// Peer2Peer service handling HTTP-like requests between wrapper clients +service Peer2Peer { + rpc Fetch (HttpLikeRequest) returns (HttpLikeResponse); +} + + // ====================================== // MESSAGES // ====================================== @@ -51,14 +57,37 @@ message Payload { } } +// http method enum +enum HttpMethod { + HTTP_METHOD_UNSPECIFIED = 0; + GET = 1; + POST = 2; + PUT = 3; + PATCH = 4; + DELETE = 5; + HEAD = 6; + OPTIONS = 7; +} + +message HttpLikeRequest { + HttpMethod method = 1; // GET/POST/... + string path = 2; // request path e.g. "/orders/add" + map headers = 3; // "content-type": "application/json" + bytes body = 4; // body (e.g. JSON) +} + +message HttpLikeResponse { + int32 status = 1; + map headers = 2; + bytes body = 3; +} + + // ====================================== // ENUMS // ====================================== enum MessageEvent { - // Unspecified event type - MESSAGE_EVENT_UNSPECIFIED = 0; - // Default value, represents an empty event MESSAGE_EVENT_EMPTY = 0; diff --git a/Tokenization/backend/wrapper/src/client/commands/newToken/newToken.handler.ts b/Tokenization/backend/wrapper/src/client/commands/newToken/newToken.handler.ts index ccfd5fc63..4fb7eb54b 100644 --- a/Tokenization/backend/wrapper/src/client/commands/newToken/newToken.handler.ts +++ b/Tokenization/backend/wrapper/src/client/commands/newToken/newToken.handler.ts @@ -46,7 +46,7 @@ export class NewTokenHandler implements CommandHandler { for (const dir of directions) { let conn = this.manager.getConnectionByAddress(targetAddress, dir); - conn ??= this.manager.createNewConnection(targetAddress, dir, token); + conn ??= await this.manager.createNewConnection(targetAddress, dir, token); conn.token = token; } } diff --git a/Tokenization/backend/wrapper/src/client/connection/Connection.ts b/Tokenization/backend/wrapper/src/client/connection/Connection.ts index 25247a8ed..3f27c31b5 100644 --- a/Tokenization/backend/wrapper/src/client/connection/Connection.ts +++ b/Tokenization/backend/wrapper/src/client/connection/Connection.ts @@ -13,7 +13,9 @@ */ import type { ConnectionDirection } from '../../models/message.model'; +import type { ConnectionHeaders, FetchOptions, FetchResponse } from '../../models/connection.model'; import { ConnectionStatus } from '../../models/connection.model'; +import * as grpc from '@grpc/grpc-js'; /** * @description This class represents a connection to a target client and manages sending messages to it. @@ -22,6 +24,7 @@ export class Connection { private _token: string; private _targetAddress: string; private _status: ConnectionStatus; + private _peerClient: any; public direction: ConnectionDirection; /** @@ -31,9 +34,10 @@ export class Connection { * @param targetAddress - The unique address of the target client. * @param direction - The direction of the connection (e.g., sending or receiving). */ - constructor(token: string, targetAddress: string, direction: ConnectionDirection) { + constructor(token: string, targetAddress: string, direction: ConnectionDirection, peerCtor: any) { this._token = token; this._targetAddress = targetAddress; + this._peerClient = new peerCtor(targetAddress, grpc.credentials.createInsecure()); this.direction = direction; this._status = ConnectionStatus.CONNECTED; @@ -72,6 +76,14 @@ export class Connection { return this._status; } + /** + * Sets the status of this connection. + * @param status The new status of the connection. + */ + public set status(status: ConnectionStatus) { + this._status = status; + } + /** * Returns target address for this Connection object * @returns Target address @@ -79,4 +91,48 @@ export class Connection { public get targetAddress(): string { return this._targetAddress; } + + /** + * "HTTP-like" fetch via gRPC protocol + * @returns Promise with peer's response + */ + public fetch(options: FetchOptions = {}): Promise { + if (!this._peerClient) { + return Promise.reject(new Error(`Peer client not attached for ${this.targetAddress}`)); + } + + // Build a request object + const method = (options.method ?? 'POST').toUpperCase(); + const path = options.path ?? '/'; + const headers: ConnectionHeaders = { ...(options.headers ?? {}) }; + + let bodyBuf: Buffer = Buffer.alloc(0); + const b = options.body; + if (b != null) { + if (Buffer.isBuffer(b)) bodyBuf = b; + else if (b instanceof Uint8Array) bodyBuf = Buffer.from(b); + else if (typeof b === 'string') bodyBuf = Buffer.from(b, 'utf8'); + else return Promise.reject(new Error('Body must be a string/Buffer/Uint8Array')); + } + + const req = { method, path, headers, body: bodyBuf }; + + // Return promise with response + return new Promise((resolve, reject) => { + this._peerClient.Fetch(req, (err: any, resp: any) => { + if (err) return reject(err); + + const resBody = resp?.body ? Buffer.from(resp.body) : Buffer.alloc(0); + const fetchResponse: FetchResponse = { + status: Number(resp?.status ?? 200), + headers: resp?.headers ?? {}, + body: resBody, + text: async () => resBody.toString('utf8'), + json: async () => JSON.parse(resBody.toString('utf8')), + }; + + resolve(fetchResponse); + }); + }); + } } diff --git a/Tokenization/backend/wrapper/src/client/connectionManager/ConnectionManager.ts b/Tokenization/backend/wrapper/src/client/connectionManager/ConnectionManager.ts index 57168742d..fdbd20263 100644 --- a/Tokenization/backend/wrapper/src/client/connectionManager/ConnectionManager.ts +++ b/Tokenization/backend/wrapper/src/client/connectionManager/ConnectionManager.ts @@ -21,10 +21,9 @@ import { LogManager } from '@aliceo2/web-ui'; import type { Command, CommandHandler } from 'models/commands.model'; import type { DuplexMessageEvent } from '../../models/message.model'; import { ConnectionDirection } from '../../models/message.model'; +import { ConnectionStatus } from '../../models/connection.model'; +import { peerListener } from '../../utils/connection/peerListener'; -/** - * @description Manages all the connection between clients and central system. - */ /** * Manages the lifecycle and connection logic for a gRPC client communicating with the central system. * @@ -45,6 +44,10 @@ export class ConnectionManager { private _centralConnection: CentralConnection; private _sendingConnections = new Map(); private _receivingConnections = new Map(); + private _wrapper: any; + private _peerCtor: any; + private _peerServer: grpc.Server | undefined; + private _baseAPIPath: string = ''; /** * Initializes a new instance of the ConnectionManager class. @@ -64,9 +67,10 @@ export class ConnectionManager { }); const proto = grpc.loadPackageDefinition(packageDef) as any; - const wrapper = proto.webui.tokenization; + this._wrapper = proto.webui.tokenization; + this._peerCtor = this._wrapper.Peer2Peer; - const client = new wrapper.CentralSystem(centralAddress, grpc.credentials.createInsecure()); + const client = new this._wrapper.CentralSystem(centralAddress, grpc.credentials.createInsecure()); // Event dispatcher for central system events this._centralDispatcher = new CentralCommandDispatcher(); @@ -109,13 +113,15 @@ export class ConnectionManager { * @param token Optional token for connection */ createNewConnection(address: string, direction: ConnectionDirection, token?: string) { - const conn = new Connection(token ?? '', address, direction); + const conn = new Connection(token ?? '', address, direction, this._peerCtor); if (direction === ConnectionDirection.RECEIVING) { this._receivingConnections.set(address, conn); } else { this._sendingConnections.set(address, conn); } + conn.status = ConnectionStatus.CONNECTED; + this._logger.infoMessage(`Connection with ${address} has been estabilished. Status: ${conn.status}`); return conn; } @@ -150,4 +156,26 @@ export class ConnectionManager { receiving: [...this._receivingConnections.values()], }; } + + /** Starts a listener server for p2p connections */ + public async listenForPeers(port: number, baseAPIPath?: string): Promise { + if (baseAPIPath) this._baseAPIPath = baseAPIPath; + + if (this._peerServer) { + this._peerServer.forceShutdown(); + this._peerServer = undefined; + } + + this._peerServer = new grpc.Server(); + this._peerServer.addService(this._wrapper.Peer2Peer.service, { + Fetch: async (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => + peerListener(call, callback, this._logger, this._receivingConnections, this._peerCtor, this._baseAPIPath), + }); + + await new Promise((resolve, reject) => { + this._peerServer?.bindAsync(`localhost:${port}`, grpc.ServerCredentials.createInsecure(), (err) => (err ? reject(err) : resolve())); + }); + + this._logger.infoMessage(`Peer server listening on localhost:${port}`); + } } diff --git a/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts b/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts index 76f93bd60..0622afe48 100644 --- a/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts +++ b/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts @@ -14,9 +14,9 @@ import { ConnectionManager } from './connectionManager/ConnectionManager'; import { RevokeTokenHandler } from './commands/revokeToken/revokeToken.handler'; -import { DuplexMessageEvent } from '../models/message.model'; -import type { Connection } from './connection/Connection'; +import { ConnectionDirection, DuplexMessageEvent } from '../models/message.model'; import { NewTokenHandler } from './commands/newToken/newToken.handler'; +import type { Connection } from './connection/Connection'; /** * @description Wrapper class for managing secure gRPC wrapper. @@ -56,12 +56,37 @@ export class gRPCWrapper { } /** - * Starts the Connection Manager stream connection with Central System + * Connects to the central system using the underlying ConnectionManager. + * + * @remarks + * This method starts the duplex stream connection with the central gRPC server. */ public connectToCentralSystem() { this._connectionManager.connectToCentralSystem(); } + /** + * Establishes a new connection to a target client. + * + * @param address - The target address of the client. + * @param token - Optional authentication token for the connection. + * + * @returns A promise that resolves to the newly created connection ready to use for fetching data. + */ + public async connectToClient(address: string, token?: string): Promise { + return this._connectionManager.createNewConnection(address, ConnectionDirection.SENDING, token ?? ''); + } + + /** + * Starts a listener server for p2p connections. + * @param port The port number to bind the p2p server to. + * @param baseAPIPath Optional base API path to forward requests to e.g. '/api'. + * @returns A promise that resolves when the p2p listener server is started. + */ + public async listenForPeers(port: number, baseAPIPath?: string): Promise { + return this._connectionManager.listenForPeers(port, baseAPIPath); + } + /** * Returns all saved connections. * diff --git a/Tokenization/backend/wrapper/src/models/connection.model.ts b/Tokenization/backend/wrapper/src/models/connection.model.ts index 2bc8a6f98..eb9cb07c5 100644 --- a/Tokenization/backend/wrapper/src/models/connection.model.ts +++ b/Tokenization/backend/wrapper/src/models/connection.model.ts @@ -44,3 +44,35 @@ export enum ConnectionStatus { // The connection is refreshing its authentication token TOKEN_REFRESH = 'TOKEN_REFRESH', } + +export type ConnectionHeaders = Record; + +export type FetchOptions = { + method?: string; + path?: string; + headers?: ConnectionHeaders; + body?: string | Buffer | Uint8Array | null; +}; + +export type FetchResponse = { + status: number; + headers: ConnectionHeaders; + body: Buffer; + text: () => Promise; + json: () => Promise; +}; + +export type HttpLikeRequest = { + method: string; + path: string; + headers: Headers; + body: Buffer; + correlation_id?: string; + sequence_number?: number; +}; + +export type HttpLikeResponse = { + status: number; + headers: Headers; + body: Buffer; +}; diff --git a/Tokenization/backend/wrapper/src/models/message.model.ts b/Tokenization/backend/wrapper/src/models/message.model.ts index 918f48505..0eb67ec1e 100644 --- a/Tokenization/backend/wrapper/src/models/message.model.ts +++ b/Tokenization/backend/wrapper/src/models/message.model.ts @@ -23,7 +23,6 @@ * @property MESSAGE_EVENT_REVOKE_TOKEN: Event for revoking an existing token. */ export enum DuplexMessageEvent { - MESSAGE_EVENT_UNSPECIFIED = 'MESSAGE_EVENT_UNSPECIFIED', MESSAGE_EVENT_EMPTY = 'MESSAGE_EVENT_EMPTY', MESSAGE_EVENT_NEW_TOKEN = 'MESSAGE_EVENT_NEW_TOKEN', MESSAGE_EVENT_REVOKE_TOKEN = 'MESSAGE_EVENT_REVOKE_TOKEN', diff --git a/Tokenization/backend/wrapper/src/test/client/commands/newToken.test.ts b/Tokenization/backend/wrapper/src/test/client/commands/newToken.test.ts index 683943527..fd5c23215 100644 --- a/Tokenization/backend/wrapper/src/test/client/commands/newToken.test.ts +++ b/Tokenization/backend/wrapper/src/test/client/commands/newToken.test.ts @@ -18,6 +18,9 @@ import { Connection } from '../../../client/connection/Connection'; import { ConnectionManager } from '../../../client/connectionManager/ConnectionManager'; import { Command } from 'models/commands.model'; import { ConnectionDirection, DuplexMessageEvent } from '../../../models/message.model'; +import * as grpc from '@grpc/grpc-js'; +import * as protoLoader from '@grpc/proto-loader'; +import path from 'path'; /** * Helper to create a new token command with given address, direction, and token. @@ -36,6 +39,19 @@ function createEventMessage(targetAddress: string, connectionDirection: Connecti describe('NewTokenHandler', () => { let manager: ConnectionManager; + const protoPath = path.join(__dirname, '..', '..', '..', '..', '..', 'proto', 'wrapper.proto'); + const packageDef = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); + + const proto = grpc.loadPackageDefinition(packageDef) as any; + const wrapper = proto.webui.tokenization; + const peerCtor = wrapper.Peer2Peer; + beforeEach(() => { manager = { sendingConnections: new Map(), @@ -49,7 +65,7 @@ describe('NewTokenHandler', () => { return undefined; }), createNewConnection: jest.fn(function (this: any, address: string, dir: ConnectionDirection, token: string) { - const conn = new Connection(token, address, dir); + const conn = new Connection(token, address, dir, peerCtor); if (dir === ConnectionDirection.SENDING) { this.sendingConnections.set(address, conn); } else { @@ -62,7 +78,7 @@ describe('NewTokenHandler', () => { it('should update token on existing SENDING connection', async () => { const targetAddress = 'peer-123'; - const conn = new Connection('old-token', targetAddress, ConnectionDirection.SENDING); + const conn = new Connection('old-token', targetAddress, ConnectionDirection.SENDING, peerCtor); (manager as any).sendingConnections.set(targetAddress, conn); const handler = new NewTokenHandler(manager); diff --git a/Tokenization/backend/wrapper/src/test/client/commands/revokeToken.test.ts b/Tokenization/backend/wrapper/src/test/client/commands/revokeToken.test.ts index b7d68ea5e..3764d1724 100644 --- a/Tokenization/backend/wrapper/src/test/client/commands/revokeToken.test.ts +++ b/Tokenization/backend/wrapper/src/test/client/commands/revokeToken.test.ts @@ -19,8 +19,24 @@ import { ConnectionManager } from '../../../client/connectionManager/ConnectionM import { ConnectionDirection, DuplexMessageEvent } from '../../../models/message.model'; import { ConnectionStatus } from '../../../models/connection.model'; import { Command } from 'models/commands.model'; +import * as grpc from '@grpc/grpc-js'; +import * as protoLoader from '@grpc/proto-loader'; +import path from 'path'; describe('RevokeToken', () => { + const protoPath = path.join(__dirname, '..', '..', '..', '..', '..', 'proto', 'wrapper.proto'); + const packageDef = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); + + const proto = grpc.loadPackageDefinition(packageDef) as any; + const wrapper = proto.webui.tokenization; + const peerCtor = wrapper.Peer2Peer; + function createEventMessage(targetAddress: string) { return { event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, @@ -45,7 +61,7 @@ describe('RevokeToken', () => { it('should revoke token when connection found in sendingConnections', async () => { const targetAddress = 'peer-123'; - const conn = new Connection('valid-token', targetAddress, ConnectionDirection.SENDING); + const conn = new Connection('valid-token', targetAddress, ConnectionDirection.SENDING, peerCtor); (manager as any).sendingConnections!.set(targetAddress, conn); const handler = new RevokeTokenHandler(manager); @@ -59,7 +75,7 @@ describe('RevokeToken', () => { it('should revoke token when connection found in receivingConnections', async () => { const targetAddress = 'peer-456'; - const conn = new Connection('valid-token', targetAddress, ConnectionDirection.RECEIVING); + const conn = new Connection('valid-token', targetAddress, ConnectionDirection.RECEIVING, peerCtor); (manager as any).receivingConnections.set(targetAddress, conn); const handler = new RevokeTokenHandler(manager); diff --git a/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test..ts b/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test..ts deleted file mode 100644 index d68c39a47..000000000 --- a/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test..ts +++ /dev/null @@ -1,152 +0,0 @@ -/** - * @license - * Copyright 2019-2020 CERN and copyright holders of ALICE O2. - * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. - * All rights not expressly granted are reserved. - * - * This software is distributed under the terms of the GNU General Public - * License v3 (GPL Version 3), copied verbatim in the file "COPYING". - * - * In applying this license CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization - * or submit itself to any jurisdiction. - */ - -import * as grpc from '@grpc/grpc-js'; -import { ConnectionManager } from '../../../client/connectionManager/ConnectionManager'; -import { DuplexMessageEvent } from '../../../models/message.model'; - -// Mock duplex stream -const mockStream = { - on: jest.fn(), - end: jest.fn(), -}; - -// Mock gRPC client -const mockClient = { - ClientStream: jest.fn(() => mockStream), -}; - -// Mock CentralSystem constructor -const CentralSystemMock = jest.fn(() => mockClient); - -// Mock dispatcher -const mockDispatch = jest.fn(); -jest.mock('../../../client/ConnectionManager/EventManagement/CentralCommandDispatcher', () => ({ - CentralCommandDispatcher: jest.fn(() => ({ - dispatch: mockDispatch, - })), -})); - -// Mock logger -jest.mock( - '@aliceo2/web-ui', - () => ({ - LogManager: { - getLogger: () => ({ - infoMessage: jest.fn(), - }), - }, - }), - { virtual: true } -); - -// Mock gRPC proto loader and client -jest.mock('@grpc/proto-loader', () => ({ - loadSync: jest.fn(() => { - return {}; - }), -})); - -jest.mock('@grpc/grpc-js', () => { - const original = jest.requireActual('@grpc/grpc-js'); - return { - ...original, - credentials: { - createInsecure: jest.fn(), - }, - loadPackageDefinition: jest.fn(() => ({ - webui: { - tokenization: { - CentralSystem: CentralSystemMock, - }, - }, - })), - }; -}); - -describe('ConnectionManager', () => { - let conn: ConnectionManager; - - beforeEach(() => { - jest.clearAllMocks(); - conn = new ConnectionManager('dummy.proto', 'localhost:12345'); - }); - - test('should initialize client with correct address', () => { - const connManager = new ConnectionManager('dummy.proto', 'localhost:12345'); - expect(connManager).toBeDefined(); - expect(grpc.loadPackageDefinition).toHaveBeenCalled(); - expect(CentralSystemMock).toHaveBeenCalledWith('localhost:12345', undefined); - }); - - test('connectToCentralSystem() should set up stream listeners', () => { - const connManager = new ConnectionManager('dummy.proto', 'localhost:12345'); - connManager.connectToCentralSystem(); - - expect(mockClient.ClientStream).toHaveBeenCalled(); - expect(mockStream.on).toHaveBeenCalledWith('data', expect.any(Function)); - expect(mockStream.on).toHaveBeenCalledWith('end', expect.any(Function)); - expect(mockStream.on).toHaveBeenCalledWith('error', expect.any(Function)); - }); - - test('disconnectFromCentralSystem() should end stream', () => { - conn.connectToCentralSystem(); - conn.disconnectFromCentralSystem(); - - expect(mockStream.end).toHaveBeenCalled(); - }); - - test("should reconnect on stream 'end'", () => { - jest.useFakeTimers(); - conn.connectToCentralSystem(); - const onEnd = mockStream.on.mock.calls.find(([event]) => event === 'end')?.[1]; - - onEnd?.(); // simulate 'end' - jest.advanceTimersByTime(2000); - - expect(mockClient.ClientStream).toHaveBeenCalledTimes(2); - jest.useRealTimers(); - }); - - test("should reconnect on stream 'error'", () => { - jest.useFakeTimers(); - conn.connectToCentralSystem(); - const onError = mockStream.on.mock.calls.find(([event]) => event === 'error')?.[1]; - - onError?.(new Error('Simulated error')); - jest.advanceTimersByTime(2000); - - expect(mockClient.ClientStream).toHaveBeenCalledTimes(2); - jest.useRealTimers(); - }); - - test("should dispatch event when 'data' is received", () => { - conn.connectToCentralSystem(); - const onData = mockStream.on.mock.calls.find(([event]) => event === 'data')?.[1]; - - const mockMessage = { - event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, - data: { - revokeToken: { - token: 'abc123', - targetAddress: 'peer-123', - }, - }, - }; - - onData?.(mockMessage); - - expect(mockDispatch).toHaveBeenCalledWith(mockMessage); - }); -}); diff --git a/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test.ts b/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test.ts new file mode 100644 index 000000000..dff46c2c6 --- /dev/null +++ b/Tokenization/backend/wrapper/src/test/client/connectionManager/ConnectionManager.test.ts @@ -0,0 +1,399 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +import * as grpc from '@grpc/grpc-js'; + +// Capture service impl registered on grpc.Server.addService +let capturedServerImpl: any | null = null; + +jest.mock('@grpc/proto-loader', () => ({ + loadSync: jest.fn(() => ({})), +})); + +const CentralSystemClientMock = jest.fn(); +const Peer2PeerCtorMock = jest.fn(); + +// Mock @grpc/grpc-js +jest.mock('@grpc/grpc-js', () => { + const original = jest.requireActual('@grpc/grpc-js'); + + const mockServer = { + addService: jest.fn((_svc: any, impl: any) => { + capturedServerImpl = impl; + }), + bindAsync: jest.fn((_addr: string, _creds: any, cb: any) => cb(null)), + forceShutdown: jest.fn(), + }; + const ServerCtor = jest.fn(() => mockServer); + + const loadPackageDefinition = jest.fn(() => ({ + webui: { + tokenization: { + CentralSystem: CentralSystemClientMock, + Peer2Peer: Object.assign(Peer2PeerCtorMock, { + service: { + Fetch: { + path: '/webui.tokenization.Peer2Peer/Fetch', + requestStream: false, + responseStream: false, + requestSerialize: (x: any) => x, + requestDeserialize: (x: any) => x, + responseSerialize: (x: any) => x, + responseDeserialize: (x: any) => x, + }, + }, + }), + }, + }, + })); + + return { + ...original, + loadPackageDefinition, + credentials: { + createInsecure: jest.fn(() => ({})), + }, + ServerCredentials: { + createInsecure: jest.fn(() => ({})), + }, + status: { + ...original.status, + INTERNAL: 13, + }, + Server: ServerCtor, + }; +}); + +// Mock CentralCommandDispatcher +const dispatcherRegisterMock = jest.fn(); +jest.mock( + '../../../client/connectionManager/eventManagement/CentralCommandDispatcher', + () => ({ + CentralCommandDispatcher: jest.fn().mockImplementation(() => ({ + register: dispatcherRegisterMock, + })), + }), + { virtual: true } +); + +// Mock CentralConnection +const centralStartMock = jest.fn(); +const centralDisconnectMock = jest.fn(); +jest.mock( + '../../../client/connectionManager/CentralConnection', + () => ({ + CentralConnection: jest.fn().mockImplementation(() => ({ + start: centralStartMock, + disconnect: centralDisconnectMock, + })), + }), + { virtual: true } +); + +// Track Connection instances and allow status changes +const createdConnections: any[] = []; +const connectionCtorMock = jest.fn().mockImplementation(function (this: any, token: string, address: string, direction: any, peerCtor: any) { + this._token = token; + this._address = address; + this.direction = direction; + this.status = undefined; + this.targetAddress = address; + this.token = token; + Object.defineProperty(this, 'status', { + get: () => this._status, + set: (v) => (this._status = v), + configurable: true, + }); + createdConnections.push({ token, address, direction, peerCtor, instance: this }); +}); +jest.mock( + '../../../client/connection/Connection', + () => ({ + Connection: connectionCtorMock, + }), + { virtual: true } +); + +const infoMessageMock = jest.fn(); +const errorMessageMock = jest.fn(); +jest.mock( + '@aliceo2/web-ui', + () => ({ + LogManager: { + getLogger: () => ({ + infoMessage: infoMessageMock, + errorMessage: errorMessageMock, + debugMessage: jest.fn(), + }), + }, + }), + { virtual: true } +); + +import { ConnectionManager } from '../../../client/connectionManager/ConnectionManager'; +import { ConnectionDirection } from '../../../models/message.model'; +import { ConnectionStatus } from '../../../models/connection.model'; + +describe('ConnectionManager', () => { + beforeEach(() => { + jest.clearAllMocks(); + capturedServerImpl = null; + createdConnections.length = 0; + // @ts-ignore + global.fetch = jest.fn(); + }); + + afterAll(() => { + // @ts-ignore + delete global.fetch; + }); + + test('constructor: loads proto, builds wrapper/peerCtor and CentralSystem client', () => { + const cm = new ConnectionManager('proto/file.proto', 'central:5555'); + expect(cm).toBeDefined(); + + expect((grpc as any).loadPackageDefinition).toHaveBeenCalled(); + expect(CentralSystemClientMock).toHaveBeenCalledWith('central:5555', expect.any(Object)); + expect(grpc.credentials.createInsecure).toHaveBeenCalled(); + }); + + test('registerCommandHandlers: calls dispatcher.register for each item', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + dispatcherRegisterMock.mockClear(); + + const handlers = [ + { event: 1 as any, handler: { handle: jest.fn() } as any }, + { event: 2 as any, handler: { handle: jest.fn() } as any }, + ]; + + cm.registerCommandHandlers(handlers); + + expect(dispatcherRegisterMock).toHaveBeenCalledTimes(2); + expect(dispatcherRegisterMock).toHaveBeenCalledWith(handlers[0].event, handlers[0].handler); + expect(dispatcherRegisterMock).toHaveBeenCalledWith(handlers[1].event, handlers[1].handler); + }); + + test('connectToCentralSystem/disconnectFromCentralSystem delegate to CentralConnection', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + // @ts-ignore + cm['_peerCtor'] = Peer2PeerCtorMock; + cm.connectToCentralSystem(); + expect(centralStartMock).toHaveBeenCalled(); + + cm.disconnectFromCentralSystem(); + expect(centralDisconnectMock).toHaveBeenCalled(); + }); + + test('createNewConnection: adds to sending map, sets CONNECTED, logs', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + // @ts-ignore + cm['_peerCtor'] = Peer2PeerCtorMock; + const conn = cm.createNewConnection('peer-A', ConnectionDirection.SENDING, 'tok123'); + + expect(connectionCtorMock).toHaveBeenCalledWith('tok123', 'peer-A', ConnectionDirection.SENDING, expect.any(Function)); + expect(conn.status).toBe(ConnectionStatus.CONNECTED); + + // Exposed via connections getter + const { sending, receiving } = cm.connections; + expect(sending.length).toBe(1); + expect(receiving.length).toBe(0); + + expect(infoMessageMock).toHaveBeenCalledWith(expect.stringContaining('Connection with peer-A has been estabilished')); + }); + + test('createNewConnection: adds to receiving map if direction is RECEIVING', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + cm.createNewConnection('peer-B', ConnectionDirection.RECEIVING); + + const { sending, receiving } = cm.connections; + expect(sending.length).toBe(0); + expect(receiving.length).toBe(1); + }); + + test('getConnectionByAddress: returns by direction. Logs on invalid direction', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + const s = cm.createNewConnection('s-1', ConnectionDirection.SENDING); + const r = cm.createNewConnection('r-1', ConnectionDirection.RECEIVING); + + expect(cm.getConnectionByAddress('s-1', ConnectionDirection.SENDING)).toBe(s); + expect(cm.getConnectionByAddress('r-1', ConnectionDirection.RECEIVING)).toBe(r); + + errorMessageMock.mockClear(); + const invalid = cm.getConnectionByAddress('x', 999 as any); + expect(invalid).toBeUndefined(); + expect(errorMessageMock).toHaveBeenCalledWith('Invalid connection direction: 999'); + }); + + test('connections getter: returns arrays (copies) of maps', () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + cm.createNewConnection('a', ConnectionDirection.SENDING); + cm.createNewConnection('b', ConnectionDirection.RECEIVING); + + const { sending, receiving } = cm.connections; + expect(Array.isArray(sending)).toBe(true); + expect(Array.isArray(receiving)).toBe(true); + expect(sending.length).toBe(1); + expect(receiving.length).toBe(1); + }); + + test('listenForPeers: creates server, registers service, binds & logs', async () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + await cm.listenForPeers(50099, 'http://localhost:41000/api/'); + + const ServerCtor = (grpc.Server as any).mock; + expect(ServerCtor).toBeDefined(); + expect(ServerCtor.calls.length).toBeGreaterThan(0); + + const serverInstance = ServerCtor.results[0].value; + expect(serverInstance.addService).toHaveBeenCalled(); + expect(serverInstance.bindAsync).toHaveBeenCalledWith('localhost:50099', expect.anything(), expect.any(Function)); + expect(infoMessageMock).toHaveBeenCalledWith('Peer server listening on localhost:50099'); + + // Service impl captured + expect(capturedServerImpl).toBeTruthy(); + expect(typeof capturedServerImpl.Fetch).toBe('function'); + }); + + test('listenForPeers: calling twice shuts previous server down', async () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + await cm.listenForPeers(50100, 'http://localhost:41000/api/'); + const firstServer = (grpc.Server as any).mock.results[0].value; + + await cm.listenForPeers(50101, 'http://localhost:41000/api/'); + expect(firstServer.forceShutdown).toHaveBeenCalled(); + }); + + test('p2p Fetch: registers new incoming receiving connection, forwards to local API, maps response', async () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + await cm.listenForPeers(50102, 'http://local/api/'); + + // Prepare incoming call and callback + const call = { + getPeer: () => 'client-42', + request: { + method: 'post', + path: 'echo', + headers: { 'content-type': 'application/json' }, + body: Buffer.from(JSON.stringify({ ping: true })), + }, + } as any; + const callback = jest.fn(); + + // Mock fetch response + // @ts-ignore + global.fetch.mockResolvedValue({ + status: 202, + headers: { + forEach: (fn: (v: string, k: string) => void) => { + fn('application/json', 'content-type'); + fn('abc', 'x-extra'); + }, + }, + arrayBuffer: async () => Buffer.from(JSON.stringify({ ok: 1 })), + }); + + const before = cm.connections.receiving.length; + await capturedServerImpl.Fetch(call, callback); + + expect(global.fetch).toHaveBeenCalledWith('http://local/api/echo', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ ping: true }), + }); + + // Response mapped back to gRPC + expect(callback).toHaveBeenCalledWith(null, { + status: 202, + headers: { 'content-type': 'application/json', 'x-extra': 'abc' }, + body: expect.any(Buffer), + }); + + // Receiving connection was created & stored + const after = cm.connections.receiving.length; + expect(after).toBeGreaterThan(before); + + const found = cm.getConnectionByAddress('client-42', ConnectionDirection.RECEIVING); + expect(found).toBeDefined(); + expect(infoMessageMock).toHaveBeenCalledWith(expect.stringContaining('Incoming request from client-42')); + expect(infoMessageMock).toHaveBeenCalledWith(expect.stringContaining('New incoming connection registered for: client-42')); + }); + + test('p2p Fetch: uses existing receiving connection when present (no duplicate creation)', async () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + await cm.listenForPeers(50103, 'http://local/api/'); + + cm.createNewConnection('client-77', ConnectionDirection.RECEIVING); + + // @ts-ignore + global.fetch.mockResolvedValue({ + status: 200, + headers: { forEach: (fn: any) => fn('text/plain', 'content-type') }, + arrayBuffer: async () => Buffer.from('ok'), + }); + + const call = { + getPeer: () => 'client-77', + request: { method: 'get', path: 'pong', headers: {}, body: undefined }, + } as any; + + const callback = jest.fn(); + + const before = cm.connections.receiving.length; + await capturedServerImpl.Fetch(call, callback); + + // No new receiving connection added + const after = cm.connections.receiving.length; + expect(after).toBe(before); + + // Forwarded with GET and no body + expect(global.fetch).toHaveBeenCalledWith('http://local/api/pong', { + method: 'GET', + headers: {}, + body: undefined, + }); + + expect(callback).toHaveBeenCalledWith( + null, + expect.objectContaining({ + status: 200, + headers: { 'content-type': 'text/plain' }, + body: expect.any(Buffer), + }) + ); + }); + + test('p2p Fetch: on forward error returns INTERNAL and logs error', async () => { + const cm = new ConnectionManager('p.proto', 'c:1'); + await cm.listenForPeers(50104, 'http://local/api/'); + + // @ts-ignore + global.fetch.mockRejectedValue(new Error('err')); + + const call = { + getPeer: () => 'err-client', + request: { method: 'get', path: 'fail', headers: {} }, + } as any; + const callback = jest.fn(); + + await capturedServerImpl.Fetch(call, callback); + + expect(errorMessageMock).toHaveBeenCalledWith(expect.stringContaining('Error forwarding request')); + expect(callback).toHaveBeenCalledWith( + expect.objectContaining({ + code: grpc.status.INTERNAL, + message: 'err', + }) + ); + }); +}); diff --git a/Tokenization/backend/wrapper/src/test/connection/Connection.test.ts b/Tokenization/backend/wrapper/src/test/connection/Connection.test.ts new file mode 100644 index 000000000..8937e9d5e --- /dev/null +++ b/Tokenization/backend/wrapper/src/test/connection/Connection.test.ts @@ -0,0 +1,211 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +import { Connection } from '../../client/connection/Connection'; +import { ConnectionStatus } from '../../models/connection.model'; + +const FAKE_DIRECTION: any = 'SENDING'; + +let lastPeerClient: any; +const PeerCtorMock = jest.fn((_addr: string, _creds: any) => { + lastPeerClient = { + Fetch: jest.fn(), + }; + return lastPeerClient; +}); + +jest.mock( + '@grpc/grpc-js', + () => { + const original = jest.requireActual('@grpc/grpc-js'); + return { + ...original, + credentials: { + createInsecure: jest.fn(() => ({ insecure: true })), + }, + }; + }, + { virtual: true } +); + +import * as grpc from '@grpc/grpc-js'; + +describe('Connection', () => { + beforeEach(() => { + jest.clearAllMocks(); + lastPeerClient = undefined; + }); + + test('constructor should create connection and set base state correctly', () => { + const conn = new Connection('tok', 'peer:50051', FAKE_DIRECTION, PeerCtorMock); + + expect(grpc.credentials.createInsecure).toHaveBeenCalled(); + expect(PeerCtorMock).toHaveBeenCalledWith('peer:50051', { insecure: true }); + + expect(conn.token).toBe('tok'); + expect(conn.targetAddress).toBe('peer:50051'); + expect(conn.status).toBe(ConnectionStatus.CONNECTED); + expect(conn.direction).toBe(FAKE_DIRECTION); + }); + + test('getter/setter for token should work', () => { + const conn = new Connection('old', 'peer:1', FAKE_DIRECTION, PeerCtorMock); + expect(conn.token).toBe('old'); + conn.token = 'new-token'; + expect(conn.token).toBe('new-token'); + }); + + test('handleRevokeToken should clear token and status to UNAUTHORIZED', () => { + const conn = new Connection('secret', 'peer:x', FAKE_DIRECTION, PeerCtorMock); + conn.handleRevokeToken(); + expect(conn.token).toBe(''); + expect(conn.status).toBe(ConnectionStatus.UNAUTHORIZED); + }); + + test('getter/setter for status should work', () => { + const conn = new Connection('t', 'a', FAKE_DIRECTION, PeerCtorMock); + conn.status = ConnectionStatus.UNAUTHORIZED; + expect(conn.status).toBe(ConnectionStatus.UNAUTHORIZED); + conn.status = ConnectionStatus.CONNECTED; + expect(conn.status).toBe(ConnectionStatus.CONNECTED); + }); + + test('getter for targetAddress should work', () => { + const conn = new Connection('t', 'host:1234', FAKE_DIRECTION, PeerCtorMock); + expect(conn.targetAddress).toBe('host:1234'); + }); + + test('fetch should throw if peer client is not attached', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + // @ts-ignore + conn['_peerClient'] = undefined; + + await expect(conn.fetch()).rejects.toThrow('Peer client not attached for addr'); + }); + + test('fetch with defaults should work', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + + lastPeerClient.Fetch.mockImplementation((req: any, cb: any) => { + try { + expect(req).toEqual({ + method: 'POST', + path: '/', + headers: {}, + body: Buffer.alloc(0), + }); + cb(null, { status: 200, headers: {}, body: Buffer.alloc(0) }); + } catch (e) { + cb(e); + } + }); + + const resp = await conn.fetch(); + expect(resp.status).toBe(200); + }); + + test('fetch builds request correctly and returns response', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + const body = Buffer.from('abc'); + + lastPeerClient.Fetch.mockImplementation((req: any, cb: any) => { + try { + expect(req.method).toBe('PUT'); + expect(req.path).toBe('/api/a'); + expect(req.headers).toEqual({ 'x-a': '1' }); + expect(Buffer.isBuffer(req.body)).toBe(true); + expect(req.body.equals(body)).toBe(true); + cb(null, { + status: 201, + headers: { 'content-type': 'text/plain' }, + body: Buffer.from('ok'), + }); + } catch (e) { + cb(e); + } + }); + + const res = await conn.fetch({ method: 'put', path: '/api/a', headers: { 'x-a': '1' }, body }); + expect(res.status).toBe(201); + expect(await res.text()).toBe('ok'); + }); + + test('fetch should convert Uint8Array to Buffer', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + const body = new Uint8Array([1, 2, 3]); + + lastPeerClient.Fetch.mockImplementation((req: any, cb: any) => { + try { + expect(Buffer.isBuffer(req.body)).toBe(true); + expect(req.body.equals(Buffer.from([1, 2, 3]))).toBe(true); + cb(null, { status: 200, headers: {}, body: Buffer.alloc(0) }); + } catch (e) { + cb(e); + } + }); + + const res = await conn.fetch({ body }); + expect(res.status).toBe(200); + }); + + test('fetch should convert string to Buffer', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + const body = 'żółć & äöü'; // handling special chars + + lastPeerClient.Fetch.mockImplementation((req: any, cb: any) => { + try { + expect(req.body.equals(Buffer.from(body, 'utf8'))).toBe(true); + cb(null, { status: 200, headers: {}, body: Buffer.from('{"ok":true}') }); + } catch (e) { + cb(e); + } + }); + + const res = await conn.fetch({ method: 'post', path: '/p', headers: {}, body }); + expect(await res.json()).toEqual({ ok: true }); + }); + + test('fetch should reject if body is not allowed', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + // @ts-ignore + await expect(conn.fetch({ body: { not: 'allowed' } })).rejects.toThrow('Body must be a string/Buffer/Uint8Array'); + }); + + test('fetch should propagate errors from peer', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + const err = new Error('err'); + lastPeerClient.Fetch.mockImplementation((_req: any, cb: any) => cb(err)); + + await expect(conn.fetch({ method: 'GET', path: '/x' })).rejects.toThrow('err'); + }); + + test('fetch should map response', async () => { + const conn = new Connection('t', 'addr', FAKE_DIRECTION, PeerCtorMock); + + const payload = { a: 1, b: 'x' }; + lastPeerClient.Fetch.mockImplementation((_req: any, cb: any) => + cb(null, { + headers: { 'x-k': 'v' }, + body: Buffer.from(JSON.stringify(payload)), + }) + ); + + const res = await conn.fetch({ method: 'GET' }); + expect(res.status).toBe(200); + expect(res.headers).toEqual({ 'x-k': 'v' }); + expect(Buffer.isBuffer(res.body)).toBe(true); + expect(await res.text()).toBe(JSON.stringify(payload)); + expect(await res.json()).toEqual(payload); + }); +}); diff --git a/Tokenization/backend/wrapper/src/utils/connection/peerListener.ts b/Tokenization/backend/wrapper/src/utils/connection/peerListener.ts new file mode 100644 index 000000000..5fafb4f1d --- /dev/null +++ b/Tokenization/backend/wrapper/src/utils/connection/peerListener.ts @@ -0,0 +1,82 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +import * as grpc from '@grpc/grpc-js'; +import { Connection } from '../../client/connection/Connection'; +import { ConnectionDirection } from '../../models/message.model'; +import { ConnectionStatus } from '../../models/connection.model'; + +/** + * Listens for incoming gRPC requests and forwards them to the local API endpoint. + * Creates a new incoming connection if one doesn't exist yet. + * + * @param call - The gRPC unary call object containing the request. + * @param callback - The callback function to be called with the response. + * @param logger - The logger object to write info and error messages. + * @param receivingConnections - The map of existing incoming connections. + * @param peerCtor - The constructor function for the peer client. + * @param baseAPIPath - The base path of the local API endpoint. + */ +export const peerListener = async ( + call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData, + logger: any, + receivingConnections: Map, + peerCtor: any, + baseAPIPath: string +) => { + try { + const clientAddress = call.getPeer(); + logger.infoMessage(`Incoming request from ${clientAddress}`); + + let conn: Connection | undefined = receivingConnections.get(clientAddress); + + if (!conn) { + conn = new Connection('', clientAddress, ConnectionDirection.RECEIVING, peerCtor); + conn.status = ConnectionStatus.CONNECTED; + receivingConnections.set(clientAddress, conn); + logger.infoMessage(`New incoming connection registered for: ${clientAddress}`); + } + + // Create request to forward to local API endpoint + const method = String(call.request?.method ?? 'POST').toUpperCase(); + const url = baseAPIPath + (call.request?.path ?? ''); + const headers: { [key: string]: string } = call.request?.headers; + const body = call.request?.body ? Buffer.from(call.request.body).toString('utf-8') : undefined; + + logger.infoMessage(`Received payload from ${clientAddress}: \n${url}\n${JSON.stringify(headers)}\n${JSON.stringify(body)}\n`); + + const httpResp = await fetch(url, { + method, + headers: headers, + body, + }); + + const respHeaders: Record = {}; + httpResp.headers.forEach((v, k) => (respHeaders[k] = v)); + const resBody = Buffer.from(await httpResp.arrayBuffer()); + + callback(null, { + status: httpResp.status, + headers: respHeaders, + body: resBody, + }); + } catch (e: any) { + logger.errorMessage(`Error forwarding request: ${e ?? 'Uknown error'}`); + + callback({ + code: grpc.status.INTERNAL, + message: e?.message ?? 'forward error', + } as any); + } +};