From d2c6fae5d18263da68077f72a6f12bd6c3c33b43 Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Fri, 22 May 2026 17:31:46 +0300 Subject: [PATCH] perf(limiter): cache plans in memory instead of $lookup per workspace --- workers/limiter/src/dbHelper.ts | 154 +++++++++++++++++-------- workers/limiter/src/index.ts | 7 +- workers/limiter/tests/dbHelper.test.ts | 3 +- 3 files changed, 114 insertions(+), 50 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index dbe45da0..4418975e 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -1,9 +1,24 @@ import { Collection, Db, ObjectId } from 'mongodb'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; +const WORKSPACE_PROJECTION = { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlanId: 1, +} as const; + +type WorkspaceForLimiter = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId' +>; + /** * Class that implements methods used for interaction between limiter and db */ @@ -23,15 +38,43 @@ export class DbHelper { */ private workspacesCollection: Collection; + /** + * Collection with tariff plans + */ + private plansCollection: Collection; + + /** + * In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace + */ + private plans: PlanDBScheme[] = []; + /** * @param projects - projects collection * @param workspaces - workspaces collection + * @param plans - plans collection * @param eventsDbConnection - connection to events DB */ - constructor(projects: Collection, workspaces: Collection, eventsDbConnection: Db) { + constructor( + projects: Collection, + workspaces: Collection, + plans: Collection, + eventsDbConnection: Db + ) { this.eventsDbConnection = eventsDbConnection; this.projectsCollection = projects; this.workspacesCollection = workspaces; + this.plansCollection = plans; + } + + /** + * Fetches tariff plans from database and keeps them cached + */ + public async fetchPlans(): Promise { + this.plans = await this.plansCollection.find({}).toArray(); + + if (this.plans.length === 0) { + throw new CriticalError('Please add tariff plans to the database'); + } } /** @@ -148,22 +191,41 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + /** + * Returns plan from cache, refetches once on miss + * + * @param planId - id of the plan to find + */ + private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise { + let plan = this.findPlanById(planId); + + if (plan) { + return plan; + } + + await this.fetchPlans(); + plan = this.findPlanById(planId); + + return plan ?? null; + } + + /** + * @param planId - id of the plan to find + */ + private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined { + return this.plans.find((plan) => plan._id.toString() === planId.toString()); + } + /** * Returns a single workspace with its tariff plan by id * * @param id - workspace id */ private async getOneWorkspaceWithTariffPlan(id: string): Promise { - const pipeline = [ - { - $match: { - _id: new ObjectId(id), - }, - }, - ...this.tariffPlanLookupPipeline(), - ]; - - const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + const workspace = await this.workspacesCollection + .find({ _id: new ObjectId(id) }) + .project(WORKSPACE_PROJECTION) + .next(); if (workspace === null) { throw new NonCriticalError(`Workspace ${id} not found`, { @@ -171,48 +233,46 @@ export class DbHelper { }); } - return workspace; + const plan = await this.resolvePlan(workspace.tariffPlanId); + + if (!plan) { + throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, { + workspaceId: id, + }); + } + + return { + ...workspace, + tariffPlan: plan, + }; } /** * Yields all workspaces with their tariff plans one by one */ private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { - const pipeline = this.tariffPlanLookupPipeline(); - const cursor = this.workspacesCollection.aggregate(pipeline); + const cursor = this.workspacesCollection + .find({}) + .project(WORKSPACE_PROJECTION); for await (const workspace of cursor) { - yield workspace; - } - } + const plan = await this.resolvePlan(workspace.tariffPlanId); - /* eslint-disable-next-line */ - private tariffPlanLookupPipeline(): any[] { - return [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - { - $project: { - _id: 1, - name: 1, - isBlocked: 1, - blockedDate: 1, - lastChargeDate: 1, - billingPeriodEventsCount: 1, - tariffPlan: 1, - }, - }, - ]; + if (!plan) { + HawkCatcher.send( + new Error(`[Limiter] Tariff plan not found for workspace`), + { + workspaceId: workspace._id.toString(), + tariffPlanId: workspace.tariffPlanId?.toString(), + } + ); + continue; + } + + yield { + ...workspace, + tariffPlan: plan, + }; + } } -} \ No newline at end of file +} diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 82ed2539..6ed21cfe 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; import * as path from 'path'; import * as dotenv from 'dotenv'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import HawkCatcher from '@hawk.so/nodejs'; import { MS_IN_SEC } from '../../../lib/utils/consts'; import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes'; @@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker { const projectsCollection = accountDbConnection.collection('projects'); const workspacesCollection = accountDbConnection.collection('workspaces'); + const plansCollection = accountDbConnection.collection('plans'); - this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection); + this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection); + + await this.dbHelper.fetchPlans(); await this.redis.initialize(); diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 947b070a..c55ae131 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -130,7 +130,8 @@ describe('DbHelper', () => { await planCollection.deleteMany({}); await planCollection.insertMany(Object.values(mockedPlans)); - dbHelper = new DbHelper(projectCollection, workspaceCollection, db); + dbHelper = new DbHelper(projectCollection, workspaceCollection, planCollection, db); + await dbHelper.fetchPlans(); }, 30000); // 30 seconds timeout for MongoDB connection and setup beforeEach(async () => {