From 3529d73a669e7c95dc1b2192cd1522f8d67fe745 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Sat, 2 May 2026 20:06:18 +0300 Subject: [PATCH 1/4] feat(metrics): expose worker metrics over HTTP (#547) --- .env.sample | 11 ++- .env.test | 4 +- lib/metrics.ts | 199 +++++++++++++++++++++++++++++++++++-------------- runner.ts | 34 +++------ 4 files changed, 162 insertions(+), 86 deletions(-) diff --git a/.env.sample b/.env.sample index f5c19c7a..bcd8054f 100644 --- a/.env.sample +++ b/.env.sample @@ -25,11 +25,14 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY_URL= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= -# pushgateway push interval in ms -PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Host for metrics endpoint binding. +PROMETHEUS_METRICS_HOST=0.0.0.0 + +# Path for metrics endpoint. +PROMETHEUS_METRICS_PATH=/metrics # Grouper memory log controls # Number of handled tasks between memory checkpoint logs diff --git a/.env.test b/.env.test index 237fe292..a60516a7 100644 --- a/.env.test +++ b/.env.test @@ -22,8 +22,8 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= # Feature flags diff --git a/lib/metrics.ts b/lib/metrics.ts index e26f208f..3cfb490f 100644 --- a/lib/metrics.ts +++ b/lib/metrics.ts @@ -1,16 +1,19 @@ import * as client from 'prom-client'; -import os from 'os'; -import { nanoid } from 'nanoid'; +import * as http from 'http'; import createLogger from './logger'; const register = new client.Registry(); const logger = createLogger(); -const DEFAULT_PUSH_INTERVAL_MS = 10_000; -const ID_SIZE = 5; -const METRICS_JOB_NAME = 'workers'; +const DEFAULT_METRICS_HOST = '0.0.0.0'; +const DEFAULT_METRICS_PATH = '/metrics'; +const MIN_PORT = 1; +const MAX_PORT = 65535; +const HTTP_OK = 200; +const HTTP_NOT_FOUND = 404; +const HTTP_INTERNAL_SERVER_ERROR = 500; -let pushInterval: NodeJS.Timeout | null = null; +let metricsServer: http.Server | null = null; let currentWorkerName = ''; client.collectDefaultMetrics({ register }); @@ -18,80 +21,160 @@ client.collectDefaultMetrics({ register }); export { register, client }; /** - * Parse push interval from environment. + * Parse metrics endpoint port from environment. */ -function getPushIntervalMs(): number { - const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; - const parsedInterval = rawInterval === undefined - ? DEFAULT_PUSH_INTERVAL_MS - : Number(rawInterval); - - const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 - ? parsedInterval - : DEFAULT_PUSH_INTERVAL_MS; - - if (rawInterval !== undefined && interval !== parsedInterval) { - logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); +function getMetricsPort(): number | null { + const rawPort = process.env.PROMETHEUS_METRICS_PORT; + + if (!rawPort) { + return null; + } + + const port = Number(rawPort); + + if (!Number.isInteger(port) || port < MIN_PORT || port > MAX_PORT) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PORT="${rawPort}"; expected an integer between ${MIN_PORT} and ${MAX_PORT}`); + + return null; + } + + return port; +} + +/** + * Read metrics endpoint path from environment. + */ +function getMetricsPath(): string { + const rawPath = process.env.PROMETHEUS_METRICS_PATH; + + if (!rawPath) { + return DEFAULT_METRICS_PATH; + } + + const path = rawPath.trim(); + + if (!path) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PATH="${rawPath}", fallback to ${DEFAULT_METRICS_PATH}`); + + return DEFAULT_METRICS_PATH; + } + + if (!path.startsWith('/')) { + const normalizedPath = `/${path}`; + + logger.warn(`[metrics] normalized PROMETHEUS_METRICS_PATH from "${rawPath}" to "${normalizedPath}"`); + + return normalizedPath; } - return interval; + return path; } /** - * Stop periodic push to pushgateway. + * Stop HTTP metrics endpoint. */ -export function stopMetricsPushing(): void { - if (!pushInterval) { +export function stopMetricsServer(): void { + if (!metricsServer) { return; } - clearInterval(pushInterval); - pushInterval = null; - logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); - currentWorkerName = ''; + const serverToStop = metricsServer; + const stoppedWorkerName = currentWorkerName; + + if (!serverToStop.listening) { + logger.info(`[metrics] endpoint already stopped for worker=${stoppedWorkerName}`); + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + return; + } + + serverToStop.close((error) => { + if (error) { + logger.error(`[metrics] failed to stop endpoint for worker=${stoppedWorkerName}: ${error.message}`); + + return; + } + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + logger.info(`[metrics] stopped endpoint for worker=${stoppedWorkerName}`); + }); } /** - * Start periodic push to pushgateway. + * Start HTTP metrics endpoint for scraper-based monitoring. * - * @param workerName - name of the worker for grouping. + * @param workerName - name of the worker for default metric labels. */ -export function startMetricsPushing(workerName: string): () => void { - const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; +export function startMetricsServer(workerName: string): () => void { + const port = getMetricsPort(); - if (!url) { - return stopMetricsPushing; + if (!port) { + return stopMetricsServer; } - if (pushInterval) { - logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + if (metricsServer) { + logger.warn(`[metrics] endpoint is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); - return stopMetricsPushing; + return stopMetricsServer; } - const interval = getPushIntervalMs(); - const hostname = os.hostname(); - const id = nanoid(ID_SIZE); - const gateway = new client.Pushgateway(url, undefined, register); + const host = process.env.PROMETHEUS_METRICS_HOST || DEFAULT_METRICS_HOST; + const path = getMetricsPath(); + + register.setDefaultLabels({ worker: workerName }); + const server = http.createServer(async (request, response) => { + const requestPath = request.url?.split('?')[0]; + + if (requestPath === '/-/healthy') { + response.writeHead(HTTP_OK, { 'Content-Type': 'text/plain' }); + response.end('ok'); + + return; + } + + if (request.method !== 'GET' || requestPath !== path) { + response.writeHead(HTTP_NOT_FOUND, { 'Content-Type': 'text/plain' }); + response.end('not found'); + + return; + } + + try { + response.writeHead(HTTP_OK, { 'Content-Type': register.contentType }); + response.end(await register.metrics()); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + + logger.error(`[metrics] failed to render metrics: ${message}`); + response.writeHead(HTTP_INTERNAL_SERVER_ERROR, { 'Content-Type': 'text/plain' }); + response.end('metrics error'); + } + }); + + server.on('error', (error) => { + logger.error(`[metrics] endpoint error for worker=${workerName}: ${error.message}`); + + if (metricsServer === server) { + metricsServer = null; + currentWorkerName = ''; + } + }); + + metricsServer = server; currentWorkerName = workerName; - logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); - - pushInterval = setInterval(() => { - gateway.pushAdd({ - jobName: METRICS_JOB_NAME, - groupings: { - worker: workerName, - host: hostname, - id, - }, - }, (err) => { - if (err) { - logger.error(`Metrics push error: ${err.message || err}`); - } - }); - }, interval); - - return stopMetricsPushing; + server.listen(port, host, () => { + logger.info(`[metrics] endpoint started for worker=${workerName} at http://${host}:${port}${path}`); + }); + + return stopMetricsServer; } diff --git a/runner.ts b/runner.ts index e68fd090..1c19f97b 100644 --- a/runner.ts +++ b/runner.ts @@ -1,15 +1,8 @@ import * as utils from './lib/utils'; - -/* Prometheus client for pushing metrics to the pushgateway */ -// import os from 'os'; -// import * as promClient from 'prom-client'; -// import gcStats from 'prometheus-gc-stats'; -// import { nanoid } from 'nanoid'; -// import * as url from 'url'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; -import { startMetricsPushing } from './lib/metrics'; +import { startMetricsServer } from './lib/metrics'; dotenv.config(); @@ -24,15 +17,15 @@ const BEGINNING_OF_ARGS = 2; */ const workerNames = process.argv.slice(BEGINNING_OF_ARGS); -/** +/** * Initialize HawkCatcher -*/ + */ if (process.env.HAWK_CATCHER_TOKEN) { HawkCatcher.init({ token: process.env.HAWK_CATCHER_TOKEN, context: { - workerTypes: workerNames.join(","), - } + workerTypes: workerNames.join(','), + }, }); } @@ -46,12 +39,10 @@ class WorkerRunner { */ private workers: Worker[] = []; - // private gateway?: promClient.Pushgateway; - /** - * Metrics push cleanup callback. + * Metrics endpoint cleanup callback. */ - private stopMetricsPushing?: () => void; + private stopMetricsServer?: () => void; /** * Create runner instance @@ -90,7 +81,7 @@ class WorkerRunner { * Run metrics exporter */ private startMetrics(): void { - if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + if (!process.env.PROMETHEUS_METRICS_PORT && !process.env.PROMETHEUS_PUSHGATEWAY_URL) { return; } @@ -105,10 +96,10 @@ class WorkerRunner { const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; if (workerTypes.length > 1) { - console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; exposing metrics as "${workerTypeForMetrics}"`); } - this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); + this.stopMetricsServer = startMetricsServer(workerTypeForMetrics); } /** @@ -243,9 +234,8 @@ class WorkerRunner { */ private async stopWorker(worker: Worker): Promise { try { - // stop pushing metrics - this.stopMetricsPushing?.(); - this.stopMetricsPushing = undefined; + this.stopMetricsServer?.(); + this.stopMetricsServer = undefined; await worker.finish(); console.log( From 5885e7750ae4f82a210639458e838a5e81a3252a Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Fri, 8 May 2026 04:51:42 +0300 Subject: [PATCH 2/4] perf(notifier): project only notifications field when loading rules (#551) * perf(notifier): project only notifications field when loading rules * test(notifier): update findOne assertion for projection arg --- workers/notifier/src/index.ts | 7 +++++-- workers/notifier/tests/worker.test.ts | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index d2d17966..c629c6d5 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -193,9 +193,12 @@ export default class NotifierWorker extends Worker { */ private async getProjectNotificationRules(projectId: string): Promise { const connection = this.accountsDb.getConnection(); - const projects = connection.collection('projects'); + const projects = connection.collection<{ notifications?: Rule[] }>('projects'); - const project = await projects.findOne({ _id: new ObjectID(projectId) }); + const project = await projects.findOne( + { _id: new ObjectID(projectId) }, + { projection: { notifications: 1 } } + ); if (!project) { throw new Error('There is no project with given id'); diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index 1f128ac1..dc438cd6 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -242,7 +242,10 @@ describe('NotifierWorker', () => { await worker.handle(message); - expect(dbQueryMock).toBeCalledWith({ _id: new ObjectID(message.projectId) }); + expect(dbQueryMock).toBeCalledWith( + { _id: new ObjectID(message.projectId) }, + { projection: { notifications: 1 } } + ); }); it('should close db connection on finish', async () => { From 2f0bad09ad335113cf4e367cd0b0674193168331 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Tue, 12 May 2026 01:21:20 +0300 Subject: [PATCH 3/4] perf(paymaster): batch subscription check over workspaces (#550) * perf(paymaster): batch subscription check over workspaces * test(paymaster): cover multi-batch dispatch in subscription check * refactor(paymaster): pass batch into flush and clear it explicitly --- workers/paymaster/src/index.ts | 73 ++++++++++++++++++++++----- workers/paymaster/tests/index.test.ts | 48 ++++++++++++++++++ 2 files changed, 108 insertions(+), 13 deletions(-) diff --git a/workers/paymaster/src/index.ts b/workers/paymaster/src/index.ts index 94be50d9..7d588be5 100644 --- a/workers/paymaster/src/index.ts +++ b/workers/paymaster/src/index.ts @@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1]; // eslint-disable-next-line @typescript-eslint/no-magic-numbers const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30]; +/** + * Bounds concurrent updateOne / addTask calls per subscription check tick. + */ +const WORKSPACE_PROCESSING_CONCURRENCY = 25; + +const WORKSPACE_CURSOR_BATCH_SIZE = 200; + +/** + * Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers. + */ +const WORKSPACE_SUBSCRIPTION_PROJECTION = { + _id: 1, + name: 1, + tariffPlanId: 1, + lastChargeDate: 1, + paidUntil: 1, + isDebug: 1, + isBlocked: 1, + blockedDate: 1, + subscriptionId: 1, +} as const; + +const PLAN_PROJECTION = { + _id: 1, + monthlyCharge: 1, +} as const; + /** * Worker to check workspaces subscription status and ban workspaces without actual subscription */ @@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker { throw new Error('Plans collection is not initialized'); } - this.plans = await this.plansCollection.find({}).toArray(); + this.plans = await this.plansCollection + .find({}) + .project(PLAN_PROJECTION) + .toArray(); if (this.plans.length === 0) { throw new Error('Please add tariff plans to the database'); @@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker { * Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription */ private async handleWorkspaceSubscriptionCheckEvent(): Promise { - const workspaces = await this.workspaces.find({}).toArray(); + const cursor = this.workspaces + .find({}) + .project(WORKSPACE_SUBSCRIPTION_PROJECTION) + .batchSize(WORKSPACE_CURSOR_BATCH_SIZE); + + let batch: WorkspaceDBScheme[] = []; - await Promise.all(workspaces - .filter(workspace => { + const flush = async (currentBatch: WorkspaceDBScheme[]): Promise => { + if (currentBatch.length === 0) { + return; + } + + await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace))); + }; + + try { + for await (const workspace of cursor) { /** * Skip workspace without lastChargeDate */ if (!workspace.lastChargeDate) { - const error = new Error('[Paymaster] Workspace without lastChargeDate detected'); - - HawkCatcher.send(error, { + HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), { workspaceId: workspace._id.toString(), }); + continue; + } + + batch.push(workspace); - return false; + if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) { + await flush(batch); + batch = []; } + } - return true; - }) - .map( - (workspace) => this.processWorkspaceSubscriptionCheck(workspace) - )); + await flush(batch); + } finally { + await cursor.close(); + } } /** diff --git a/workers/paymaster/tests/index.test.ts b/workers/paymaster/tests/index.test.ts index 51ff31fd..23a1bd69 100644 --- a/workers/paymaster/tests/index.test.ts +++ b/workers/paymaster/tests/index.test.ts @@ -794,6 +794,54 @@ describe('PaymasterWorker', () => { MockDate.reset(); }); + test('Should process every workspace when there are several batches', async () => { + /** + * 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check + * has to flush more than one batch. + */ + const WORKSPACES_COUNT = 50; + const currentDate = new Date('2005-12-22'); + const plan = createPlanMock({ + monthlyCharge: 0, + isDefault: true, + }); + + const workspaces = Array.from({ length: WORKSPACES_COUNT }, () => + createWorkspaceMock({ + plan, + subscriptionId: null, + lastChargeDate: new Date('2005-11-22'), + isBlocked: false, + billingPeriodEventsCount: 0, + }) + ); + + await tariffCollection.insertOne(plan); + await workspacesCollection.insertMany(workspaces); + + MockDate.set(currentDate); + + const worker = new PaymasterWorker(); + const processSpy = jest + .spyOn(worker as any, 'processWorkspaceSubscriptionCheck') + .mockResolvedValue([null, false]); + + await worker.start(); + await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK); + await worker.finish(); + + expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT); + + const calledIds = processSpy.mock.calls + .map((call) => (call[0] as WorkspaceDBScheme)._id.toString()) + .sort(); + const expectedIds = workspaces.map((w) => w._id.toString()).sort(); + + expect(calledIds).toEqual(expectedIds); + + MockDate.reset(); + }); + afterAll(async () => { await connection.close(); MockDate.reset(); From 8f68626e3321eb3e278a088febed5c621d6af369 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 13 May 2026 19:15:57 +0300 Subject: [PATCH 4/4] Imp(limiter): fetch workspaces with cursor (#554) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * imp(): fetch workspaces one by one * imp(): add workspace projection * imp(): fix function overload * fix(): eslint and types * fix(): missing await --------- Co-authored-by: Егор Коновалов --- workers/limiter/src/dbHelper.ts | 109 +++++++++++++++++-------- workers/limiter/src/index.ts | 14 ++-- workers/limiter/tests/dbHelper.test.ts | 18 ++-- workers/limiter/types/index.ts | 7 +- 4 files changed, 101 insertions(+), 47 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index b66336e9..dbe45da0 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb'; import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; -import { CriticalError } from '../../../lib/workerErrors'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; /** * Class that implements methods used for interaction between limiter and db @@ -35,49 +35,24 @@ export class DbHelper { } /** - * Method that returns all workspaces with their tariff plans + * Method that yields all workspaces with their tariff plans */ - public async getWorkspacesWithTariffPlans():Promise; + public getWorkspacesWithTariffPlans(): AsyncGenerator; /** * Method that returns workspace with its tariff plan by its id * * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id: string):Promise; + public getWorkspacesWithTariffPlans(id: string): Promise; /** - * Returns workspace with its tariff plan by its id - * - * @param id - workspace id + * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id?: string):Promise { - /* eslint-disable-next-line */ - const queue: any[] = [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - ]; - + public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { if (id !== undefined) { - queue.unshift({ - $match: { - _id: new ObjectId(id), - }, - }); + return this.getOneWorkspaceWithTariffPlan(id); } - const workspacesArray = await this.workspacesCollection.aggregate(queue).toArray(); - - return (id !== undefined) ? workspacesArray[0] : workspacesArray; + return this.yieldWorkspacesWithTariffPlans(); } /** @@ -172,4 +147,72 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + + /** + * Returns a single workspace with its tariff plan by id + * + * @param id - workspace id + */ + private async getOneWorkspaceWithTariffPlan(id: string): Promise { + const pipeline = [ + { + $match: { + _id: new ObjectId(id), + }, + }, + ...this.tariffPlanLookupPipeline(), + ]; + + const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + + if (workspace === null) { + throw new NonCriticalError(`Workspace ${id} not found`, { + workspaceId: id, + }); + } + + return workspace; + } + + /** + * Yields all workspaces with their tariff plans one by one + */ + private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { + const pipeline = this.tariffPlanLookupPipeline(); + const cursor = this.workspacesCollection.aggregate(pipeline); + + for await (const workspace of cursor) { + yield workspace; + } + } + + /* eslint-disable-next-line */ + private tariffPlanLookupPipeline(): any[] { + return [ + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $project: { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlan: 1, + }, + }, + ]; + } } \ No newline at end of file diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 9cb2abf6..82ed2539 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -189,16 +189,16 @@ export default class LimiterWorker extends Worker { private async handleRegularWorkspacesCheck(): Promise { let message = ''; - const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans(); + const workspaces = this.dbHelper.getWorkspacesWithTariffPlans(); const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; - await Promise.all(workspaces.map(async (workspace) => { + for await (const workspace of workspaces) { /** * If workspace is already blocked - do nothing */ if (workspace.isBlocked) { - return; + continue; } const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString()); @@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker { * If there are no projects to update - move on to next workspace */ if (projectsToUpdate.length === 0) { - return; + continue; } /** @@ -223,12 +223,12 @@ export default class LimiterWorker extends Worker { updatedWorkspace.isBlocked = true; updatedWorkspace.blockedDate = new Date(); - this.redis.appendBannedProjects(projectIds); + await this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } - })); + } - this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); + await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); this.sendRegularReport(message); } diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 3a4a49f8..947b070a 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -159,16 +159,22 @@ describe('DbHelper', () => { /** * Act */ - const result = await dbHelper.getWorkspacesWithTariffPlans(); + const cursor = dbHelper.getWorkspacesWithTariffPlans(); + + const workspaces = []; + + for await (const workspace of cursor) { + workspaces.push(workspace); + } /** * Assert */ - expect(result).toHaveLength(2); - expect(result[0].tariffPlan).toBeDefined(); - expect(result[1].tariffPlan).toBeDefined(); - expect(result[0].tariffPlan.eventsLimit).toBe(10); - expect(result[1].tariffPlan.eventsLimit).toBe(10000); + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan).toBeDefined(); + expect(workspaces[1].tariffPlan).toBeDefined(); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); }); test('Should return single workspace with its tariff plan by id', async () => { diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts index c04c3936..e788e5dc 100644 --- a/workers/limiter/types/index.ts +++ b/workers/limiter/types/index.ts @@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; /** * Workspace with its tariff plan */ -export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +export type WorkspaceWithTariffPlan = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' +> & { + tariffPlan: PlanDBScheme; +};