From 185212d22d2cf559d3ff0ef26a73a2dce966432a Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 31 Jan 2026 02:20:15 +0500 Subject: [PATCH 01/11] chore(grouper): add counters to the grouper worker --- workers/grouper/src/index.ts | 181 +++++++++++++++++ workers/grouper/src/redisHelper.ts | 213 ++++++++++++++++++- workers/grouper/tests/index.test.ts | 304 ++++++++++++++++++++++++++++ 3 files changed, 697 insertions(+), 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52c71ca8..e11f9b4e 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -31,6 +31,7 @@ import { hasValue } from '../../../lib/utils/hasValue'; */ /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import TimeMs from '../../../lib/utils/time'; /** * eslint does not count decorators as a variable usage @@ -306,6 +307,186 @@ export default class GrouperWorker extends Worker { }); } } + + await this.incrementRateLimitCounter(task.projectId); + await this.recordProjectMetrics(task.projectId, 'events-stored'); + } + + /** + * Build RedisTimeSeries key for project metrics. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + * @param granularity - time granularity + */ + private getTimeSeriesKey( + projectId: string, + metricType: string, + granularity: 'minutely' | 'hourly' | 'daily' + ): string { + return `ts:project-${metricType}:${projectId}:${granularity}`; + } + + /** + * Record project metrics to Redis TimeSeries. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + */ + private async recordProjectMetrics(projectId: string, metricType: string): Promise { + const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely'); + const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly'); + const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily'); + + const labels: Record = { + type: 'error', + status: metricType, + project: projectId, + }; + + const series = [ + { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, + { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + ]; + + for (const { key, label, retentionMs } of series) { + try { + await this.redis.safeTsAdd(key, 1, labels, retentionMs); + } catch (error) { + this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + } + } + } + + /** + * Increment rate limit counters for the project. + * + * @param projectId - id of the project + */ + private async incrementRateLimitCounter(projectId: string): Promise { + try { + const settings = await this.getProjectRateLimitSettings(projectId); + + if (!settings) { + return; + } + + await this.redis.incrementRateLimitCounterForCurrentEvent( + projectId, + settings.eventsPeriod, + settings.eventsLimit + ); + } catch (error) { + this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); + } + } + + /** + * Fetch and normalize rate limit settings + * Rate limit settings could appear in tarifPlan, workspace and project. + * All rateLimits have different priority. + * + * @param projectId - id of the project + */ + @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) + private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { + if (!projectId || !mongodb.ObjectID.isValid(projectId)) { + return null; + } + + const accountsDb = this.accountsDb.getConnection(); + + /** + * Fetch project from the db + */ + const project = await accountsDb + .collection('projects') + .findOne( + { _id: new mongodb.ObjectId(projectId) }, + { projection: { rateLimitSettings: 1, workspaceId: 1 } } + ); + + if (!project) { + return null; + } + + const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; + const workspaceId = new mongodb.ObjectID(project.workspaceId); + + let planRateLimitSettings: { N: number, T: number}; + let workspaceRateLimitSettings: { N: number, T: number}; + + /** + * Fetch workspace from the db + */ + if (workspaceId) { + const workspace = await accountsDb + .collection('workspaces') + .findOne( + { _id: workspaceId }, + { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + ); + + workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; + + const planId = new mongodb.ObjectId(workspace?.tariffPlanId); + + /** + * Tarif plan from the db + */ + if (planId) { + const plan = await accountsDb + .collection('plans') + .findOne( + { _id: planId }, + { projection: { rateLimitSettings: 1 } } + ); + + planRateLimitSettings = plan?.rateLimitSettings; + } + } + + return this.normalizeRateLimitSettings( + planRateLimitSettings, + workspaceRateLimitSettings, + projectRateLimitSettings + ); + } + + /** + * Normalize rate limit settings shape from database. + * + * @param rateLimitLayers - raw settings documents in priority order + */ + private normalizeRateLimitSettings( + ...rateLimitLayers: { N: number, T: number }[] + ): { eventsLimit: number; eventsPeriod: number } | null { + let eventsLimit = 0; + let eventsPeriod = 0; + + for (const layer of rateLimitLayers) { + if (!layer) { + continue; + } + + const limit = layer.N as number; + const period = layer.T as number; + + if (limit !== undefined && limit > 0) { + eventsLimit = limit; + } + + if (period !== undefined && period > 0) { + eventsPeriod = period; + } + } + + if (eventsLimit <= 0 || eventsPeriod <= 0) { + return null; + } + + return { eventsLimit, eventsPeriod }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index a655c24b..f0f45a7e 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -110,6 +110,217 @@ export default class RedisHelper { return result === null; } + /** + * Increments redis counter used for rate limiting + * + * @param projectId - id of the project which event belongs to + * @param eventsPeriod - rate limit period configured for the project + */ + public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { + const script = ` + local key = KEYS[1] + local field = ARGV[1] + local now = tonumber(ARGV[2]) + local period = tonumber(ARGV[3]) + local limit = tonumber(ARGV[4]) + + local current = redis.call('HGET', key, field) + + -- If no record yet, start a new window with count = 1 + if not current then + redis.call('HSET', key, field, now .. ':1') + return + end + + local timestamp, count = string.match(current, '(%d+):(%d+)') + timestamp = tonumber(timestamp) + count = tonumber(count) + + -- Check if we're in a new time window + if now - timestamp >= period then + redis.call('HSET', key, field, now .. ':1') + return + end + + -- Check if incrementing would exceed limit + if count + 1 > limit then + return + end + + -- Increment counter + redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) + ` + + const key = 'rate_limits'; + const now = Math.floor(Date.now() / 1000); + + await this.redisClient.eval(script, { + keys: [key], + arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], + }); + } + + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + + /** + * Creates a RedisTimeSeries key if it doesn't exist. + * + * @param key - time series key + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async tsCreateIfNotExists( + key: string, + labels: Record, + retentionMs = 0 + ): Promise { + const exists = await this.redisClient.exists(key); + + if (exists > 0) { + return; + } + + const args: string[] = ['TS.CREATE', key]; + + if (retentionMs > 0) { + args.push('RETENTION', Math.floor(retentionMs).toString()); + } + + args.push(...this.buildLabelArguments(labels)); + + await this.redisClient.sendCommand(args); + } + + /** + * Increments a RedisTimeSeries key with labels and timestamp. + * + * @param key - time series key + * @param value - value to increment by + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsIncrBy( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.INCRBY', + key, + value.toString(), + 'TIMESTAMP', + Math.floor(timestamp).toString(), + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and increments it safely. + * + * @param key - time series key + * @param value - value to increment by + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsIncrBy( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsIncrBy(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsIncrBy(key, value, timestamp, labels); + } else { + throw error; + } + } + } + + /** + * Adds a sample to a RedisTimeSeries key. + * + * @param key - time series key + * @param value - value to add + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsAdd( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.ADD', + key, + Math.floor(timestamp).toString(), + value.toString(), + 'ON_DUPLICATE', + 'SUM', + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and adds a sample safely. + * + * @param key - time series key + * @param value - value to add + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsAdd( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsAdd(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsAdd(key, value, timestamp, labels); + } else { + throw error; + } + } + } + /** * Creates callback function for Redis operations * @@ -130,4 +341,4 @@ export default class RedisHelper { resolve(resp !== 'OK'); }; } -} +} \ No newline at end of file diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index ee781e98..2ae61810 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -7,6 +7,7 @@ import type { Collection } from 'mongodb'; import { MongoClient } from 'mongodb'; import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import TimeMs from '../../../lib/utils/time'; import * as mongodb from 'mongodb'; import { patch } from '@n1ru4l/json-patch-plus'; @@ -57,16 +58,41 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ +const planIdMock = new mongodb.ObjectId(); +const workspaceIdMock = new mongodb.ObjectId(); + +const planMock = { + _id: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + +const workspaceMock = { + _id: workspaceIdMock, + tariffPlanId: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', + workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', + rateLimitSettings: { + N: 0, + T: 0, + }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -113,8 +139,30 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; + let workspacesCollection: Collection; + let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; + const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await plansCollection.updateOne( + { _id: planIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await workspacesCollection.updateOne( + { _id: workspaceIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await projectsCollection.updateOne( + { _id: new mongodb.ObjectId(projectIdMock) }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + ); + }; beforeAll(async () => { worker = new GrouperWorker(); @@ -133,12 +181,17 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); + workspacesCollection = accountsConnection.db().collection('workspaces'); + plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); + await plansCollection.insertOne(planMock); + await workspacesCollection.insertOne(workspaceMock); + /** * Insert mock project into accounts database */ @@ -155,9 +208,13 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); + delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); + await setPlanRateLimit(0, 0); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -743,10 +800,257 @@ describe('GrouperWorker', () => { }); }); + describe('Rate limits counter increment', () => { + const rateLimitsKey = 'rate_limits'; + + test('increments counter when handling an event', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 1_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); + } finally { + nowSpy.mockRestore(); + } + }); + + test('reuses window and increments while within limit', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 2_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(2); + } finally { + nowSpy.mockRestore(); + } + }); + + test('does not exceed configured limit within same window', async () => { + const eventsLimit = 3; + + await setProjectRateLimit(eventsLimit, 60); + + let currentTime = 3_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < eventsLimit + 2; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(eventsLimit); + } finally { + nowSpy.mockRestore(); + } + }); + + test('resets window after period elapses', async () => { + await setProjectRateLimit(5, 2); + + let currentTime = 4_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + currentTime += 3_000; // advance by 3 seconds + + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [timestamp, count] = (storedValue as string).split(':'); + + expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); + expect(Number(count)).toBe(1); + } finally { + nowSpy.mockRestore(); + } + }); + + test('uses workspace limits when project overrides are absent', async () => { + await setPlanRateLimit(10, 60); + await setWorkspaceRateLimit(3, 60); + await setProjectRateLimit(0, 0); + + let currentTime = 5_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 5; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(3); + } finally { + nowSpy.mockRestore(); + } + }); + + test('falls back to plan limits when workspace settings are empty', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); + + let currentTime = 6_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 6; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(4); + } finally { + nowSpy.mockRestore(); + } + }); + + test('prefers project limits over workspace and plan', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(6, 60); + await setProjectRateLimit(8, 60); + + let currentTime = 7_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 10; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(8); + } finally { + nowSpy.mockRestore(); + } + }); + }); + + describe('Events-stored metrics', () => { + test('writes minutely, hourly, and daily samples after handling an event', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + + try { + await worker.handle(generateTask()); + + expect(safeTsAddSpy).toHaveBeenCalledTimes(3); + + const expectedLabels = { + type: 'error', + status: 'events-stored', + project: projectIdMock, + }; + + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 1, + `ts:project-events-stored:${projectIdMock}:minutely`, + 1, + expectedLabels, + TimeMs.DAY, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 2, + `ts:project-events-stored:${projectIdMock}:hourly`, + 1, + expectedLabels, + TimeMs.WEEK, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 3, + `ts:project-events-stored:${projectIdMock}:daily`, + 1, + expectedLabels, + 90 * TimeMs.DAY, + ); + } finally { + safeTsAddSpy.mockRestore(); + } + }); + + test('logs when a time-series write fails but continues processing', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined); + const failure = new Error('TS failure'); + + safeTsAddSpy + .mockImplementationOnce(() => Promise.resolve()) + .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(() => Promise.resolve()); + + try { + await worker.handle(generateTask()); + + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(await eventsCollection.find().count()).toBe(1); + } finally { + safeTsAddSpy.mockRestore(); + loggerErrorSpy.mockRestore(); + } + }); + + test('records metrics exactly once per handled event', async () => { + const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics'); + + try { + await worker.handle(generateTask()); + + expect(recordMetricsSpy).toHaveBeenCalledTimes(1); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + } finally { + recordMetricsSpy.mockRestore(); + } + }); + }); + afterAll(async () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); + await workspacesCollection.deleteMany({}); + await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From ba2d283135e062002fb622b1c7bc34aef5fa8dcf Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:49:23 +0300 Subject: [PATCH 02/11] chore(): eslint fix --- workers/grouper/src/index.ts | 1 + workers/grouper/src/redisHelper.ts | 33 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index e11f9b4e..7e10f824 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -347,6 +347,7 @@ export default class GrouperWorker extends Worker { const series = [ { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, ]; diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index f0f45a7e..0d207208 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,6 +2,7 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; +import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis @@ -152,7 +153,7 @@ export default class RedisHelper { ` const key = 'rate_limits'; - const now = Math.floor(Date.now() / 1000); + const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { keys: [key], @@ -160,21 +161,6 @@ export default class RedisHelper { }); } - /** - * Build label arguments for RedisTimeSeries commands - * - * @param labels - labels to attach to the time series - */ - private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; - - for (const [labelKey, labelValue] of Object.entries(labels)) { - labelArgs.push(labelKey, labelValue); - } - - return labelArgs; - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * @@ -321,6 +307,21 @@ export default class RedisHelper { } } + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + /** * Creates callback function for Redis operations * From 48cbf6bc3cd5bb629512dbdb33b6b143f89304d5 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:59:33 +0300 Subject: [PATCH 03/11] chore(): clean up --- workers/grouper/src/index.ts | 39 ++++++++++++++++++----- workers/grouper/src/redisHelper.ts | 7 +++-- workers/grouper/tests/index.test.ts | 49 +++++++++++++++++++++-------- 3 files changed, 72 insertions(+), 23 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 7e10f824..ec366829 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -345,10 +345,22 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + }, + { + key: dailyKey, + label: 'daily', + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + retentionMs: 90 * TimeMs.DAY, + }, ]; for (const { key, label, retentionMs } of series) { @@ -405,7 +417,12 @@ export default class GrouperWorker extends Worker { .collection('projects') .findOne( { _id: new mongodb.ObjectId(projectId) }, - { projection: { rateLimitSettings: 1, workspaceId: 1 } } + { + projection: { + rateLimitSettings: 1, + workspaceId: 1, + }, + } ); if (!project) { @@ -426,7 +443,12 @@ export default class GrouperWorker extends Worker { .collection('workspaces') .findOne( { _id: workspaceId }, - { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + { + projection: { + rateLimitSettings: 1, + tariffPlanId: 1, + }, + } ); workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; @@ -487,7 +509,10 @@ export default class GrouperWorker extends Worker { return null; } - return { eventsLimit, eventsPeriod }; + return { + eventsLimit, + eventsPeriod, + }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index 0d207208..bed08418 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -116,6 +116,7 @@ export default class RedisHelper { * * @param projectId - id of the project which event belongs to * @param eventsPeriod - rate limit period configured for the project + * @param limit - current event count limit (from project / workspace / plan) */ public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { const script = ` @@ -150,13 +151,13 @@ export default class RedisHelper { -- Increment counter redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - ` + `; const key = 'rate_limits'; const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { - keys: [key], + keys: [ key ], arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], }); } @@ -313,7 +314,7 @@ export default class RedisHelper { * @param labels - labels to attach to the time series */ private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; + const labelArgs: string[] = [ 'LABELS' ]; for (const [labelKey, labelValue] of Object.entries(labels)) { labelArgs.push(labelKey, labelValue); diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2ae61810..34bad1cc 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -146,21 +146,42 @@ describe('GrouperWorker', () => { const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await plansCollection.updateOne( { _id: planIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await workspacesCollection.updateOne( { _id: workspaceIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await projectsCollection.updateOne( { _id: new mongodb.ObjectId(projectIdMock) }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + } ); }; @@ -806,7 +827,7 @@ describe('GrouperWorker', () => { test('increments counter when handling an event', async () => { await setProjectRateLimit(5, 60); - let currentTime = 1_000_000; + const currentTime = 1_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -823,7 +844,7 @@ describe('GrouperWorker', () => { test('reuses window and increments while within limit', async () => { await setProjectRateLimit(5, 60); - let currentTime = 2_000_000; + const currentTime = 2_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -847,7 +868,7 @@ describe('GrouperWorker', () => { await setProjectRateLimit(eventsLimit, 60); - let currentTime = 3_000_000; + const currentTime = 3_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -898,7 +919,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(3, 60); await setProjectRateLimit(0, 0); - let currentTime = 5_000_000; + const currentTime = 5_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -923,7 +944,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(0, 0); await setProjectRateLimit(0, 0); - let currentTime = 6_000_000; + const currentTime = 6_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -948,7 +969,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(6, 60); await setProjectRateLimit(8, 60); - let currentTime = 7_000_000; + const currentTime = 7_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -989,21 +1010,21 @@ describe('GrouperWorker', () => { `ts:project-events-stored:${projectIdMock}:minutely`, 1, expectedLabels, - TimeMs.DAY, + TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, `ts:project-events-stored:${projectIdMock}:hourly`, 1, expectedLabels, - TimeMs.WEEK, + TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, `ts:project-events-stored:${projectIdMock}:daily`, 1, expectedLabels, - 90 * TimeMs.DAY, + 90 * TimeMs.DAY ); } finally { safeTsAddSpy.mockRestore(); @@ -1017,7 +1038,9 @@ describe('GrouperWorker', () => { safeTsAddSpy .mockImplementationOnce(() => Promise.resolve()) - .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(async () => { + throw failure; + }) .mockImplementationOnce(() => Promise.resolve()); try { From 78f24cd2e1114f5229c9b6aab4d3d3790620f170 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:05:48 +0300 Subject: [PATCH 04/11] chore(grouper): remove redundant rate-limit increment logic --- workers/grouper/src/index.ts | 144 ------------------- workers/grouper/src/redisHelper.ts | 51 ------- workers/grouper/tests/index.test.ts | 215 ---------------------------- 3 files changed, 410 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index ec366829..34792177 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -308,7 +308,6 @@ export default class GrouperWorker extends Worker { } } - await this.incrementRateLimitCounter(task.projectId); await this.recordProjectMetrics(task.projectId, 'events-stored'); } @@ -372,149 +371,6 @@ export default class GrouperWorker extends Worker { } } - /** - * Increment rate limit counters for the project. - * - * @param projectId - id of the project - */ - private async incrementRateLimitCounter(projectId: string): Promise { - try { - const settings = await this.getProjectRateLimitSettings(projectId); - - if (!settings) { - return; - } - - await this.redis.incrementRateLimitCounterForCurrentEvent( - projectId, - settings.eventsPeriod, - settings.eventsLimit - ); - } catch (error) { - this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); - } - } - - /** - * Fetch and normalize rate limit settings - * Rate limit settings could appear in tarifPlan, workspace and project. - * All rateLimits have different priority. - * - * @param projectId - id of the project - */ - @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) - private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { - if (!projectId || !mongodb.ObjectID.isValid(projectId)) { - return null; - } - - const accountsDb = this.accountsDb.getConnection(); - - /** - * Fetch project from the db - */ - const project = await accountsDb - .collection('projects') - .findOne( - { _id: new mongodb.ObjectId(projectId) }, - { - projection: { - rateLimitSettings: 1, - workspaceId: 1, - }, - } - ); - - if (!project) { - return null; - } - - const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; - const workspaceId = new mongodb.ObjectID(project.workspaceId); - - let planRateLimitSettings: { N: number, T: number}; - let workspaceRateLimitSettings: { N: number, T: number}; - - /** - * Fetch workspace from the db - */ - if (workspaceId) { - const workspace = await accountsDb - .collection('workspaces') - .findOne( - { _id: workspaceId }, - { - projection: { - rateLimitSettings: 1, - tariffPlanId: 1, - }, - } - ); - - workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; - - const planId = new mongodb.ObjectId(workspace?.tariffPlanId); - - /** - * Tarif plan from the db - */ - if (planId) { - const plan = await accountsDb - .collection('plans') - .findOne( - { _id: planId }, - { projection: { rateLimitSettings: 1 } } - ); - - planRateLimitSettings = plan?.rateLimitSettings; - } - } - - return this.normalizeRateLimitSettings( - planRateLimitSettings, - workspaceRateLimitSettings, - projectRateLimitSettings - ); - } - - /** - * Normalize rate limit settings shape from database. - * - * @param rateLimitLayers - raw settings documents in priority order - */ - private normalizeRateLimitSettings( - ...rateLimitLayers: { N: number, T: number }[] - ): { eventsLimit: number; eventsPeriod: number } | null { - let eventsLimit = 0; - let eventsPeriod = 0; - - for (const layer of rateLimitLayers) { - if (!layer) { - continue; - } - - const limit = layer.N as number; - const period = layer.T as number; - - if (limit !== undefined && limit > 0) { - eventsLimit = limit; - } - - if (period !== undefined && period > 0) { - eventsPeriod = period; - } - } - - if (eventsLimit <= 0 || eventsPeriod <= 0) { - return null; - } - - return { - eventsLimit, - eventsPeriod, - }; - } - /** * Trims source code lines in event's backtrace to prevent memory leaks * diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index bed08418..cc009cd5 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -111,57 +111,6 @@ export default class RedisHelper { return result === null; } - /** - * Increments redis counter used for rate limiting - * - * @param projectId - id of the project which event belongs to - * @param eventsPeriod - rate limit period configured for the project - * @param limit - current event count limit (from project / workspace / plan) - */ - public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { - const script = ` - local key = KEYS[1] - local field = ARGV[1] - local now = tonumber(ARGV[2]) - local period = tonumber(ARGV[3]) - local limit = tonumber(ARGV[4]) - - local current = redis.call('HGET', key, field) - - -- If no record yet, start a new window with count = 1 - if not current then - redis.call('HSET', key, field, now .. ':1') - return - end - - local timestamp, count = string.match(current, '(%d+):(%d+)') - timestamp = tonumber(timestamp) - count = tonumber(count) - - -- Check if we're in a new time window - if now - timestamp >= period then - redis.call('HSET', key, field, now .. ':1') - return - end - - -- Check if incrementing would exceed limit - if count + 1 > limit then - return - end - - -- Increment counter - redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - `; - - const key = 'rate_limits'; - const now = Math.floor(Date.now() / MS_IN_SEC); - - await this.redisClient.eval(script, { - keys: [ key ], - arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], - }); - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 34bad1cc..2986bf7f 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -143,48 +143,6 @@ describe('GrouperWorker', () => { let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; - const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await plansCollection.updateOne( - { _id: planIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await workspacesCollection.updateOne( - { _id: workspaceIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await projectsCollection.updateOne( - { _id: new mongodb.ObjectId(projectIdMock) }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - } - ); - }; - beforeAll(async () => { worker = new GrouperWorker(); @@ -229,13 +187,9 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); - delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); - await setPlanRateLimit(0, 0); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -821,175 +775,6 @@ describe('GrouperWorker', () => { }); }); - describe('Rate limits counter increment', () => { - const rateLimitsKey = 'rate_limits'; - - test('increments counter when handling an event', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 1_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); - } finally { - nowSpy.mockRestore(); - } - }); - - test('reuses window and increments while within limit', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 2_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(2); - } finally { - nowSpy.mockRestore(); - } - }); - - test('does not exceed configured limit within same window', async () => { - const eventsLimit = 3; - - await setProjectRateLimit(eventsLimit, 60); - - const currentTime = 3_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < eventsLimit + 2; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(eventsLimit); - } finally { - nowSpy.mockRestore(); - } - }); - - test('resets window after period elapses', async () => { - await setProjectRateLimit(5, 2); - - let currentTime = 4_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - currentTime += 3_000; // advance by 3 seconds - - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [timestamp, count] = (storedValue as string).split(':'); - - expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); - expect(Number(count)).toBe(1); - } finally { - nowSpy.mockRestore(); - } - }); - - test('uses workspace limits when project overrides are absent', async () => { - await setPlanRateLimit(10, 60); - await setWorkspaceRateLimit(3, 60); - await setProjectRateLimit(0, 0); - - const currentTime = 5_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 5; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(3); - } finally { - nowSpy.mockRestore(); - } - }); - - test('falls back to plan limits when workspace settings are empty', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); - - const currentTime = 6_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 6; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(4); - } finally { - nowSpy.mockRestore(); - } - }); - - test('prefers project limits over workspace and plan', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(6, 60); - await setProjectRateLimit(8, 60); - - const currentTime = 7_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 10; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(8); - } finally { - nowSpy.mockRestore(); - } - }); - }); - describe('Events-stored metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); From 7f0b557085647cb23d026ef406095ca6a352c0b5 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:08:15 +0300 Subject: [PATCH 05/11] chore(grouper): remove redundant mocks --- workers/grouper/tests/index.test.ts | 34 ----------------------------- 1 file changed, 34 deletions(-) diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2986bf7f..af1c2cf2 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -58,41 +58,16 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ -const planIdMock = new mongodb.ObjectId(); -const workspaceIdMock = new mongodb.ObjectId(); - -const planMock = { - _id: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - -const workspaceMock = { - _id: workspaceIdMock, - tariffPlanId: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', - workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', - rateLimitSettings: { - N: 0, - T: 0, - }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -139,8 +114,6 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; - let workspacesCollection: Collection; - let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; beforeAll(async () => { @@ -160,17 +133,12 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); - workspacesCollection = accountsConnection.db().collection('workspaces'); - plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); - await plansCollection.insertOne(planMock); - await workspacesCollection.insertOne(workspaceMock); - /** * Insert mock project into accounts database */ @@ -857,8 +825,6 @@ describe('GrouperWorker', () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); - await workspacesCollection.deleteMany({}); - await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From 020d8bd149d7c585bfb3d7fcc3f8153cfd886a43 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:09:47 +0300 Subject: [PATCH 06/11] chore(): eslint fix --- workers/grouper/src/redisHelper.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index cc009cd5..d15df940 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,7 +2,6 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; -import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis From 1836ba6c6535b1fd69cafb2860c879b6a9ad5fed Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:12:44 +0300 Subject: [PATCH 07/11] chore(): change metric type --- workers/grouper/src/index.ts | 2 +- workers/grouper/tests/index.test.ts | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 34792177..1135e108 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -308,7 +308,7 @@ export default class GrouperWorker extends Worker { } } - await this.recordProjectMetrics(task.projectId, 'events-stored'); + await this.recordProjectMetrics(task.projectId, 'events-accepted'); } /** diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index af1c2cf2..6ad01812 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -743,7 +743,7 @@ describe('GrouperWorker', () => { }); }); - describe('Events-stored metrics', () => { + describe('Events-accepted metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); @@ -754,27 +754,27 @@ describe('GrouperWorker', () => { const expectedLabels = { type: 'error', - status: 'events-stored', + status: 'events-accepted', project: projectIdMock, }; expect(safeTsAddSpy).toHaveBeenNthCalledWith( 1, - `ts:project-events-stored:${projectIdMock}:minutely`, + `ts:project-events-accepted:${projectIdMock}:minutely`, 1, expectedLabels, TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, - `ts:project-events-stored:${projectIdMock}:hourly`, + `ts:project-events-accepted:${projectIdMock}:hourly`, 1, expectedLabels, TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, - `ts:project-events-stored:${projectIdMock}:daily`, + `ts:project-events-accepted:${projectIdMock}:daily`, 1, expectedLabels, 90 * TimeMs.DAY @@ -799,7 +799,7 @@ describe('GrouperWorker', () => { try { await worker.handle(generateTask()); - expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure); expect(await eventsCollection.find().count()).toBe(1); } finally { safeTsAddSpy.mockRestore(); @@ -814,7 +814,7 @@ describe('GrouperWorker', () => { await worker.handle(generateTask()); expect(recordMetricsSpy).toHaveBeenCalledTimes(1); - expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted'); } finally { recordMetricsSpy.mockRestore(); } From 9db962e65d812dcd9cf4f57a7674a7f537cf0375 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:44:50 +0300 Subject: [PATCH 08/11] Update workers/grouper/src/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workers/grouper/src/index.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 1135e108..2aa42199 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -362,13 +362,19 @@ export default class GrouperWorker extends Worker { }, ]; - for (const { key, label, retentionMs } of series) { - try { - await this.redis.safeTsAdd(key, 1, labels, retentionMs); - } catch (error) { - this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + const operations = series.map(({ key, label, retentionMs }) => ({ + label, + promise: this.redis.safeTsAdd(key, 1, labels, retentionMs), + })); + + const results = await Promise.allSettled(operations.map((op) => op.promise)); + + results.forEach((result, index) => { + if (result.status === 'rejected') { + const { label } = operations[index]; + this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason); } - } + }); } /** From b6e85ed37083d7705eb67610490048ee840726ad Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 22:06:29 +0300 Subject: [PATCH 09/11] imp(): use lua for create if not exists, to avoid race-cond --- workers/grouper/src/redisHelper.ts | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index d15df940..ec486ddb 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -122,13 +122,16 @@ export default class RedisHelper { labels: Record, retentionMs = 0 ): Promise { - const exists = await this.redisClient.exists(key); + const script = ` + if redis.call('EXISTS', KEYS[1]) == 1 then + return 0 + end - if (exists > 0) { - return; - } + redis.call('TS.CREATE', KEYS[1], unpack(ARGV)) + return 1 + `; - const args: string[] = ['TS.CREATE', key]; + const args: string[] = []; if (retentionMs > 0) { args.push('RETENTION', Math.floor(retentionMs).toString()); @@ -136,7 +139,13 @@ export default class RedisHelper { args.push(...this.buildLabelArguments(labels)); - await this.redisClient.sendCommand(args); + await this.redisClient.eval( + script, + { + keys: [key], + arguments: args, + } + ); } /** From 8d54dc9badda95a5da8816b9dbab91f6b1450318 Mon Sep 17 00:00:00 2001 From: Peter Savchenko Date: Tue, 23 Dec 2025 23:40:34 +0300 Subject: [PATCH 10/11] add logs to sentry worker --- workers/sentry/src/index.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/workers/sentry/src/index.ts b/workers/sentry/src/index.ts index d9e3dde9..be57bcfb 100644 --- a/workers/sentry/src/index.ts +++ b/workers/sentry/src/index.ts @@ -41,7 +41,6 @@ export default class SentryEventWorker extends Worker { if (items.length === 0) { this.logger.warn('Received envelope with no items'); - return; } @@ -50,7 +49,6 @@ export default class SentryEventWorker extends Worker { for (const item of items) { const result = await this.handleEnvelopeItem(headers, item, event.projectId); - if (result === 'processed') { processedCount++; } else if (result === 'skipped') { @@ -249,15 +247,16 @@ export default class SentryEventWorker extends Worker { if (!taskSent) { /** * If addTask returns false, the message was not queued (queue full or channel closed) + * This is a critical error that should be logged and thrown */ const error = new Error(`Failed to queue event to ${workerName} worker. Queue may be full or channel closed.`); - this.logger.error(error.message); this.logger.info('👇 Here is the event that failed to queue:'); this.logger.json(hawkEvent); throw error; } + this.logger.verbose(`Successfully queued event to ${workerName} worker`); return 'processed'; } catch (error) { this.logger.error('Error handling envelope item:', error); From db3d530e82c8480ff6147242d1414eedba44f8e3 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Tue, 24 Feb 2026 03:23:54 +0300 Subject: [PATCH 11/11] chore(): remove redundant import --- workers/grouper/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 2aa42199..998142fe 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -31,7 +31,6 @@ import { hasValue } from '../../../lib/utils/hasValue'; */ /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; -import TimeMs from '../../../lib/utils/time'; /** * eslint does not count decorators as a variable usage