diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index b66336e99..dbe45da0f 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb'; import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; -import { CriticalError } from '../../../lib/workerErrors'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; /** * Class that implements methods used for interaction between limiter and db @@ -35,49 +35,24 @@ export class DbHelper { } /** - * Method that returns all workspaces with their tariff plans + * Method that yields all workspaces with their tariff plans */ - public async getWorkspacesWithTariffPlans():Promise; + public getWorkspacesWithTariffPlans(): AsyncGenerator; /** * Method that returns workspace with its tariff plan by its id * * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id: string):Promise; + public getWorkspacesWithTariffPlans(id: string): Promise; /** - * Returns workspace with its tariff plan by its id - * - * @param id - workspace id + * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id?: string):Promise { - /* eslint-disable-next-line */ - const queue: any[] = [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - ]; - + public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { if (id !== undefined) { - queue.unshift({ - $match: { - _id: new ObjectId(id), - }, - }); + return this.getOneWorkspaceWithTariffPlan(id); } - const workspacesArray = await this.workspacesCollection.aggregate(queue).toArray(); - - return (id !== undefined) ? workspacesArray[0] : workspacesArray; + return this.yieldWorkspacesWithTariffPlans(); } /** @@ -172,4 +147,72 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + + /** + * Returns a single workspace with its tariff plan by id + * + * @param id - workspace id + */ + private async getOneWorkspaceWithTariffPlan(id: string): Promise { + const pipeline = [ + { + $match: { + _id: new ObjectId(id), + }, + }, + ...this.tariffPlanLookupPipeline(), + ]; + + const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + + if (workspace === null) { + throw new NonCriticalError(`Workspace ${id} not found`, { + workspaceId: id, + }); + } + + return workspace; + } + + /** + * Yields all workspaces with their tariff plans one by one + */ + private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { + const pipeline = this.tariffPlanLookupPipeline(); + const cursor = this.workspacesCollection.aggregate(pipeline); + + for await (const workspace of cursor) { + yield workspace; + } + } + + /* eslint-disable-next-line */ + private tariffPlanLookupPipeline(): any[] { + return [ + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $project: { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlan: 1, + }, + }, + ]; + } } \ No newline at end of file diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 9cb2abf69..82ed25392 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -189,16 +189,16 @@ export default class LimiterWorker extends Worker { private async handleRegularWorkspacesCheck(): Promise { let message = ''; - const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans(); + const workspaces = this.dbHelper.getWorkspacesWithTariffPlans(); const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; - await Promise.all(workspaces.map(async (workspace) => { + for await (const workspace of workspaces) { /** * If workspace is already blocked - do nothing */ if (workspace.isBlocked) { - return; + continue; } const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString()); @@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker { * If there are no projects to update - move on to next workspace */ if (projectsToUpdate.length === 0) { - return; + continue; } /** @@ -223,12 +223,12 @@ export default class LimiterWorker extends Worker { updatedWorkspace.isBlocked = true; updatedWorkspace.blockedDate = new Date(); - this.redis.appendBannedProjects(projectIds); + await this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } - })); + } - this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); + await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); this.sendRegularReport(message); } diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 3a4a49f80..947b070a7 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -159,16 +159,22 @@ describe('DbHelper', () => { /** * Act */ - const result = await dbHelper.getWorkspacesWithTariffPlans(); + const cursor = dbHelper.getWorkspacesWithTariffPlans(); + + const workspaces = []; + + for await (const workspace of cursor) { + workspaces.push(workspace); + } /** * Assert */ - expect(result).toHaveLength(2); - expect(result[0].tariffPlan).toBeDefined(); - expect(result[1].tariffPlan).toBeDefined(); - expect(result[0].tariffPlan.eventsLimit).toBe(10); - expect(result[1].tariffPlan.eventsLimit).toBe(10000); + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan).toBeDefined(); + expect(workspaces[1].tariffPlan).toBeDefined(); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); }); test('Should return single workspace with its tariff plan by id', async () => { diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts index c04c3936f..e788e5dc4 100644 --- a/workers/limiter/types/index.ts +++ b/workers/limiter/types/index.ts @@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; /** * Workspace with its tariff plan */ -export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +export type WorkspaceWithTariffPlan = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' +> & { + tariffPlan: PlanDBScheme; +};