From 80d80fed89d065bdb08fc6578c855ba05f5c1ad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=95=D0=B3=D0=BE=D1=80=20=D0=9A=D0=BE=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=BE=D0=B2?= Date: Mon, 11 May 2026 18:42:28 +0300 Subject: [PATCH 1/5] imp(): fetch workspaces one by one --- workers/limiter/src/dbHelper.ts | 20 +++++++++++++------- workers/limiter/src/index.ts | 6 +++--- workers/limiter/tests/dbHelper.test.ts | 18 ++++++++++++------ 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index b66336e9..9815e2f2 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -35,22 +35,22 @@ export class DbHelper { } /** - * Method that returns all workspaces with their tariff plans + * Method that yields all workspaces with their tariff plans */ - public async getWorkspacesWithTariffPlans():Promise; + public getWorkspacesWithTariffPlans(): AsyncGenerator; /** * Method that returns workspace with its tariff plan by its id * * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id: string):Promise; + public async getWorkspacesWithTariffPlans(id: string): Promise; /** * Returns workspace with its tariff plan by its id * * @param id - workspace id */ - public async getWorkspacesWithTariffPlans(id?: string):Promise { - /* eslint-disable-next-line */ + public async *getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { + /* eslint-disable-next-line */ const queue: any[] = [ { $lookup: { @@ -75,9 +75,15 @@ export class DbHelper { }); } - const workspacesArray = await this.workspacesCollection.aggregate(queue).toArray(); + const workspaces = this.workspacesCollection.aggregate(queue); - return (id !== undefined) ? workspacesArray[0] : workspacesArray; + if (id !== undefined) { + return await workspaces.next(); + } + + for await (const workspace of workspaces) { + yield workspace; + } } /** diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 9cb2abf6..9989ce70 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -189,11 +189,11 @@ export default class LimiterWorker extends Worker { private async handleRegularWorkspacesCheck(): Promise { let message = ''; - const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans(); + const workspaces = this.dbHelper.getWorkspacesWithTariffPlans(); const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; - await Promise.all(workspaces.map(async (workspace) => { + for await (const workspace of workspaces) { /** * If workspace is already blocked - do nothing */ @@ -226,7 +226,7 @@ export default class LimiterWorker extends Worker { this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } - })); + }; this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 3a4a49f8..ea185359 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -159,16 +159,22 @@ describe('DbHelper', () => { /** * Act */ - const result = await dbHelper.getWorkspacesWithTariffPlans(); + const cursor = dbHelper.getWorkspacesWithTariffPlans(); + + const workspaces = [] + + for await (const workspace of cursor) { + workspaces.push(workspace) + } /** * Assert */ - expect(result).toHaveLength(2); - expect(result[0].tariffPlan).toBeDefined(); - expect(result[1].tariffPlan).toBeDefined(); - expect(result[0].tariffPlan.eventsLimit).toBe(10); - expect(result[1].tariffPlan.eventsLimit).toBe(10000); + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan).toBeDefined(); + expect(workspaces[1].tariffPlan).toBeDefined(); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); }); test('Should return single workspace with its tariff plan by id', async () => { From d6b6c5e1cd8423b54b87977f359ec8ed50af9697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=95=D0=B3=D0=BE=D1=80=20=D0=9A=D0=BE=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=BE=D0=B2?= Date: Mon, 11 May 2026 19:00:23 +0300 Subject: [PATCH 2/5] imp(): add workspace projection --- workers/limiter/src/dbHelper.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index 9815e2f2..860c97f2 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -65,6 +65,14 @@ export class DbHelper { path: '$tariffPlan', }, }, + { + projection: { + _id: 1, + isBlocked: 1, + lastChargeDate: 1, + tariffPlan: 1, + } + } ]; if (id !== undefined) { From 0053c2c2e4a4e2cef89c5f8464ef3fcf1070674b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=95=D0=B3=D0=BE=D1=80=20=D0=9A=D0=BE=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=BE=D0=B2?= Date: Tue, 12 May 2026 02:03:01 +0300 Subject: [PATCH 3/5] imp(): fix function overload --- workers/limiter/src/dbHelper.ts | 68 +++++++++++++++++++++------------ workers/limiter/src/index.ts | 8 ++-- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index 860c97f2..a96d32d7 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -43,15 +43,48 @@ export class DbHelper { * * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id: string): Promise; + public getWorkspacesWithTariffPlans(id: string): Promise; + public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { + if (id !== undefined) { + return this.getOneWorkspaceWithTariffPlan(id); + } + + return this.yieldWorkspacesWithTariffPlans(); + } + /** - * Returns workspace with its tariff plan by its id + * Returns a single workspace with its tariff plan by id * * @param id - workspace id */ - public async *getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { + private async getOneWorkspaceWithTariffPlan(id: string): Promise { + const pipeline = [ + { + $match: { + _id: new ObjectId(id), + }, + }, + ...this.tariffPlanLookupPipeline(), + ]; + + return this.workspacesCollection.aggregate(pipeline).next(); + } + + /** + * Yields all workspaces with their tariff plans one by one + */ + private async *yieldWorkspacesWithTariffPlans(): AsyncGenerator { + const pipeline = this.tariffPlanLookupPipeline(); + const cursor = this.workspacesCollection.aggregate(pipeline); + + for await (const workspace of cursor) { + yield workspace; + } + } + /* eslint-disable-next-line */ - const queue: any[] = [ + private tariffPlanLookupPipeline(): any[] { + return [ { $lookup: { from: 'plans', @@ -66,32 +99,17 @@ export class DbHelper { }, }, { - projection: { + $project: { _id: 1, + name: 1, isBlocked: 1, + blockedDate: 1, lastChargeDate: 1, + billingPeriodEventsCount: 1, tariffPlan: 1, - } - } - ]; - - if (id !== undefined) { - queue.unshift({ - $match: { - _id: new ObjectId(id), }, - }); - } - - const workspaces = this.workspacesCollection.aggregate(queue); - - if (id !== undefined) { - return await workspaces.next(); - } - - for await (const workspace of workspaces) { - yield workspace; - } + }, + ]; } /** diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 9989ce70..137fbdb6 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -198,7 +198,7 @@ export default class LimiterWorker extends Worker { * If workspace is already blocked - do nothing */ if (workspace.isBlocked) { - return; + continue; } const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString()); @@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker { * If there are no projects to update - move on to next workspace */ if (projectsToUpdate.length === 0) { - return; + continue; } /** @@ -226,9 +226,9 @@ export default class LimiterWorker extends Worker { this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } - }; + } - this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); + await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); this.sendRegularReport(message); } From 56d65460b97bb2d2048bc1073c008bfb938ee344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=95=D0=B3=D0=BE=D1=80=20=D0=9A=D0=BE=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=BE=D0=B2?= Date: Wed, 13 May 2026 02:41:48 +0300 Subject: [PATCH 4/5] fix(): eslint and types --- workers/limiter/src/dbHelper.ts | 133 +++++++++++++------------ workers/limiter/src/index.ts | 2 +- workers/limiter/tests/dbHelper.test.ts | 4 +- workers/limiter/types/index.ts | 7 +- 4 files changed, 81 insertions(+), 65 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index a96d32d7..1468e24d 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb'; import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; -import { CriticalError } from '../../../lib/workerErrors'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; /** * Class that implements methods used for interaction between limiter and db @@ -44,6 +44,9 @@ export class DbHelper { * @param id - id of the workspace to fetch */ public getWorkspacesWithTariffPlans(id: string): Promise; + /** + * @param id - id of the workspace to fetch + */ public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { if (id !== undefined) { return this.getOneWorkspaceWithTariffPlan(id); @@ -52,66 +55,6 @@ export class DbHelper { return this.yieldWorkspacesWithTariffPlans(); } - /** - * Returns a single workspace with its tariff plan by id - * - * @param id - workspace id - */ - private async getOneWorkspaceWithTariffPlan(id: string): Promise { - const pipeline = [ - { - $match: { - _id: new ObjectId(id), - }, - }, - ...this.tariffPlanLookupPipeline(), - ]; - - return this.workspacesCollection.aggregate(pipeline).next(); - } - - /** - * Yields all workspaces with their tariff plans one by one - */ - private async *yieldWorkspacesWithTariffPlans(): AsyncGenerator { - const pipeline = this.tariffPlanLookupPipeline(); - const cursor = this.workspacesCollection.aggregate(pipeline); - - for await (const workspace of cursor) { - yield workspace; - } - } - - /* eslint-disable-next-line */ - private tariffPlanLookupPipeline(): any[] { - return [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - { - $project: { - _id: 1, - name: 1, - isBlocked: 1, - blockedDate: 1, - lastChargeDate: 1, - billingPeriodEventsCount: 1, - tariffPlan: 1, - }, - }, - ]; - } - /** * Updates workspaces data in Database * @@ -204,4 +147,72 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + + /** + * Returns a single workspace with its tariff plan by id + * + * @param id - workspace id + */ + private async getOneWorkspaceWithTariffPlan(id: string): Promise { + const pipeline = [ + { + $match: { + _id: new ObjectId(id), + }, + }, + ...this.tariffPlanLookupPipeline(), + ]; + + const workspace = this.workspacesCollection.aggregate(pipeline).next(); + + if (workspace === null) { + throw new NonCriticalError(`Workspace ${id} not found`, { + workspaceId: id, + }); + } + + return workspace; + } + + /** + * Yields all workspaces with their tariff plans one by one + */ + private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { + const pipeline = this.tariffPlanLookupPipeline(); + const cursor = this.workspacesCollection.aggregate(pipeline); + + for await (const workspace of cursor) { + yield workspace; + } + } + + /* eslint-disable-next-line */ + private tariffPlanLookupPipeline(): any[] { + return [ + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $project: { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlan: 1, + }, + }, + ]; + } } \ No newline at end of file diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 137fbdb6..82ed2539 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -223,7 +223,7 @@ export default class LimiterWorker extends Worker { updatedWorkspace.isBlocked = true; updatedWorkspace.blockedDate = new Date(); - this.redis.appendBannedProjects(projectIds); + await this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } } diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index ea185359..947b070a 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -161,10 +161,10 @@ describe('DbHelper', () => { */ const cursor = dbHelper.getWorkspacesWithTariffPlans(); - const workspaces = [] + const workspaces = []; for await (const workspace of cursor) { - workspaces.push(workspace) + workspaces.push(workspace); } /** diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts index c04c3936..e788e5dc 100644 --- a/workers/limiter/types/index.ts +++ b/workers/limiter/types/index.ts @@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; /** * Workspace with its tariff plan */ -export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +export type WorkspaceWithTariffPlan = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' +> & { + tariffPlan: PlanDBScheme; +}; From 2ad9212b8bb485026a4ee35eb439504b7bf75d77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=95=D0=B3=D0=BE=D1=80=20=D0=9A=D0=BE=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=BE=D0=B2?= Date: Wed, 13 May 2026 11:07:47 +0300 Subject: [PATCH 5/5] fix(): missing await --- workers/limiter/src/dbHelper.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index 1468e24d..dbe45da0 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -163,7 +163,7 @@ export class DbHelper { ...this.tariffPlanLookupPipeline(), ]; - const workspace = this.workspacesCollection.aggregate(pipeline).next(); + const workspace = await this.workspacesCollection.aggregate(pipeline).next(); if (workspace === null) { throw new NonCriticalError(`Workspace ${id} not found`, {