diff --git a/services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts b/services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts new file mode 100644 index 0000000000..42b05a1652 --- /dev/null +++ b/services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts @@ -0,0 +1,111 @@ +import CronTime from 'cron-time-generator' + +import { IS_PROD_ENV } from '@crowd/common' +import { IntegrationStreamWorkerEmitter } from '@crowd/common_services' +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue' +import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client' +import { WebhookState } from '@crowd/types' + +import { IJobDefinition } from '../types' + +const TOPIC = 'integration-stream-worker-high-production' +const GROUP_ID = 'integration-stream-worker-high-production' +const MAX_UNCONSUMED = 50000 + +const job: IJobDefinition = { + name: 'incoming-webhooks-check', + cronTime: CronTime.everyDay(), + timeout: 30 * 60, // 30 minutes + enabled: async () => IS_PROD_ENV, + process: async (ctx) => { + const kafkaClient = getKafkaClient(QUEUE_CONFIG()) + const admin = kafkaClient.admin() + await admin.connect() + + let counts: { total: number; consumed: number; unconsumed: number } + try { + counts = await getKafkaMessageCounts(ctx.log, admin, TOPIC, GROUP_ID) + } finally { + await admin.disconnect() + } + + if (counts.unconsumed >= MAX_UNCONSUMED) { + ctx.log.info( + `Integration stream worker queue has ${counts.unconsumed} unconsumed messages, skipping!`, + ) + return + } + + const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) + + // Clean up orphaned webhooks whose integration was deleted (hard or soft). + // incomingWebhooks has no FK constraint on integrationId so these accumulate silently. + const deleted = await dbConnection.result( + ` + delete from "incomingWebhooks" iw + where not exists ( + select 1 from integrations i + where i.id = iw."integrationId" + and i."deletedAt" is null + ) + `, + ) + if (deleted.rowCount > 0) { + ctx.log.info( + `Deleted ${deleted.rowCount} orphaned webhooks with missing or deleted integrations!`, + ) + } + + const count = ( + await dbConnection.one( + ` + select count(*)::int as count + from "incomingWebhooks" iw + join integrations i on iw."integrationId" = i.id and i."deletedAt" is null + where iw.state = $(state) + and iw."createdAt" < now() - interval '1 day' + `, + { state: WebhookState.PENDING }, + ) + ).count + + if (count <= counts.unconsumed) { + ctx.log.info(`All ${count} stuck pending webhooks are already in the queue, skipping!`) + return + } + + const webhooks = await dbConnection.any<{ id: string; platform: string }>( + ` + select iw.id, i.platform + from "incomingWebhooks" iw + join integrations i on iw."integrationId" = i.id and i."deletedAt" is null + where iw.state = $(state) + and iw."createdAt" < now() - interval '1 day' + order by iw."createdAt" asc + limit 10000 + `, + { state: WebhookState.PENDING }, + ) + + if (webhooks.length === 0) { + ctx.log.info('No stuck pending webhooks found!') + return + } + + ctx.log.info(`Found ${webhooks.length} stuck pending webhooks, re-triggering!`) + + const queueService = new KafkaQueueService(kafkaClient, ctx.log) + const emitter = new IntegrationStreamWorkerEmitter(queueService, ctx.log) + await emitter.init() + + await emitter.triggerWebhookProcessingBatch( + webhooks.map((w) => w.id), + true, + ) + + ctx.log.info(`Re-triggered ${webhooks.length} stuck pending webhooks in total!`) + }, +} + +export default job diff --git a/services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts b/services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts index 4c6ac2b69b..10598f8db6 100644 --- a/services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts +++ b/services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts @@ -1,10 +1,9 @@ import CronTime from 'cron-time-generator' -import { generateUUIDv1, partition } from '@crowd/common' +import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common' import { DataSinkWorkerEmitter } from '@crowd/common_services' import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' -import { Logger } from '@crowd/logging' -import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue' +import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue' import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client' import { DataSinkWorkerQueueMessageType, IntegrationResultState } from '@crowd/types' @@ -14,6 +13,7 @@ const job: IJobDefinition = { name: 'integration-results-check', cronTime: CronTime.every(10).minutes(), timeout: 30 * 60, // 30 minutes + enabled: async () => IS_PROD_ENV, process: async (ctx) => { const topic = 'data-sink-worker-normal-production' const groupId = 'data-sink-worker-normal-production' @@ -22,107 +22,65 @@ const job: IJobDefinition = { const admin = kafkaClient.admin() await admin.connect() - const counts = await getMessageCounts(ctx.log, admin, topic, groupId) + try { + const counts = await getKafkaMessageCounts(ctx.log, admin, topic, groupId) - // if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average) - if (counts.unconsumed < 50000) { - const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) + // if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average) + if (counts.unconsumed < 50000) { + const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) - // we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :) - const count = ( - await dbConnection.one( - `select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`, - ) - ).count + // we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :) + const count = ( + await dbConnection.one( + `select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`, + ) + ).count - if (count > counts.unconsumed) { - ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`) + if (count > counts.unconsumed) { + ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`) - const queueService = new KafkaQueueService(kafkaClient, ctx.log) - const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log) - await dswEmitter.init() + const queueService = new KafkaQueueService(kafkaClient, ctx.log) + const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log) + await dswEmitter.init() - const resultIds = ( - await dbConnection.any( - `select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`, - ) - ).map((r) => r.id) - - let triggered = 0 - - for (const batch of partition(resultIds, 10)) { - const messages = batch.map((resultId) => { - return { - payload: { - type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT, - resultId, - }, - groupId: generateUUIDv1(), - deduplicationId: resultId, - } - }) + const resultIds = ( + await dbConnection.any( + `select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`, + ) + ).map((r) => r.id) + + let triggered = 0 - await dswEmitter.sendMessages(messages) + for (const batch of partition(resultIds, 10)) { + const messages = batch.map((resultId) => { + return { + payload: { + type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT, + resultId, + }, + groupId: generateUUIDv1(), + deduplicationId: resultId, + } + }) - triggered += batch.length + await dswEmitter.sendMessages(messages) - if (triggered % 1000 === 0) { - ctx.log.info(`Triggered ${triggered} results!`) + triggered += batch.length + + if (triggered % 1000 === 0) { + ctx.log.info(`Triggered ${triggered} results!`) + } } - } - ctx.log.info(`Triggered ${triggered} results in total!`) + ctx.log.info(`Triggered ${triggered} results in total!`) + } + } else { + ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`) } - } else { - ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`) + } finally { + await admin.disconnect() } }, } -async function getMessageCounts( - log: Logger, - admin: KafkaAdmin, - topic: string, - groupId: string, -): Promise<{ - total: number - consumed: number - unconsumed: number -}> { - try { - const topicOffsets = await admin.fetchTopicOffsets(topic) - const offsetsResponse = await admin.fetchOffsets({ - groupId: groupId, - topics: [topic], - }) - - const offsets = offsetsResponse[0].partitions - - let totalMessages = 0 - let consumedMessages = 0 - let totalLeft = 0 - - for (const offset of offsets) { - const topicOffset = topicOffsets.find((p) => p.partition === offset.partition) - if (topicOffset) { - // Total messages is the latest offset - totalMessages += Number(topicOffset.offset) - // Consumed messages is the consumer group's offset - consumedMessages += Number(offset.offset) - // Unconsumed is the difference - totalLeft += Number(topicOffset.offset) - Number(offset.offset) - } - } - - return { - total: totalMessages, - consumed: consumedMessages, - unconsumed: totalLeft, - } - } catch (err) { - log.error(err, 'Failed to get message count!') - throw err - } -} - export default job diff --git a/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts new file mode 100644 index 0000000000..5ccd1630a4 --- /dev/null +++ b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts @@ -0,0 +1,160 @@ +import CronTime from 'cron-time-generator' + +import { IS_DEV_ENV, IS_PROD_ENV } from '@crowd/common' +import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { + SlackChannel, + SlackMessageSection, + SlackPersona, + sendSlackNotificationAsync, +} from '@crowd/slack' +import { IntegrationResultState } from '@crowd/types' + +import { IJobDefinition } from '../types' + +interface IResultStateCount { + state: string + count: number +} + +interface IErrorGroup { + errorMessage: string + location: string + message: string + count: number + avgRetries: number + maxRetries: number + oldest: Date + newest: Date + platforms: string | null +} + +const job: IJobDefinition = { + name: 'integration-results-reporting', + cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.everyDayAt(8, 30), + timeout: 10 * 60, // 10 minutes + enabled: async () => IS_PROD_ENV, + process: async (ctx) => { + ctx.log.info('Running integration-results-reporting job...') + + const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0) + + // Count results per state + const stateCounts = await dbConnection.any( + `SELECT state, count(*)::int AS count FROM integration.results GROUP BY state ORDER BY count DESC`, + ) + + const countByState: Record = {} + for (const row of stateCounts) { + countByState[row.state] = row.count + } + + const pending = countByState[IntegrationResultState.PENDING] ?? 0 + const processing = countByState[IntegrationResultState.PROCESSING] ?? 0 + const processed = countByState[IntegrationResultState.PROCESSED] ?? 0 + const delayed = countByState[IntegrationResultState.DELAYED] ?? 0 + const errorCount = countByState[IntegrationResultState.ERROR] ?? 0 + const total = pending + processing + processed + delayed + errorCount + + // How many delayed results are overdue (i.e. should already be processed) + const overdueDelayed = ( + await dbConnection.one<{ count: number }>( + `SELECT count(*)::int AS count FROM integration.results WHERE state = 'delayed' AND "delayedUntil" < now()`, + ) + ).count + + // Break down errors by errorMessage + location, enriched with platform info + const errorGroups = await dbConnection.any( + ` + SELECT + COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage", + COALESCE(r.error->>'location', '[no location]') AS location, + COALESCE(r.error->>'message', '[no message]') AS message, + count(*)::int AS count, + round(avg(r.retries), 1)::float AS "avgRetries", + max(r.retries)::int AS "maxRetries", + min(r."createdAt") AS oldest, + max(r."updatedAt") AS newest, + string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms + FROM integration.results r + LEFT JOIN integrations i ON i.id = r."integrationId" + WHERE r.state = 'error' + GROUP BY + r.error->>'errorMessage', + r.error->>'location', + r.error->>'message' + ORDER BY count DESC + LIMIT 20 + `, + ) + + const sections: SlackMessageSection[] = [] + + sections.push({ + title: 'Integration Results Summary', + text: [ + `*Total:* ${total.toLocaleString()}`, + '', + `⏳ Pending: *${pending.toLocaleString()}*`, + `⚙️ Processing: *${processing.toLocaleString()}*`, + `✅ Processed: *${processed.toLocaleString()}*`, + `🕐 Delayed: *${delayed.toLocaleString()}*${overdueDelayed > 0 ? ` (${overdueDelayed.toLocaleString()} overdue)` : ''}`, + `❌ Error: *${errorCount.toLocaleString()}*`, + ].join('\n'), + }) + + if (errorCount > 0 && errorGroups.length > 0) { + const lines: string[] = [ + `Top ${errorGroups.length} error group${errorGroups.length !== 1 ? 's' : ''} out of *${errorCount.toLocaleString()}* total errors:`, + '', + ] + + for (const group of errorGroups) { + const oldestHoursAgo = Math.round( + (Date.now() - new Date(group.oldest).getTime()) / 3_600_000, + ) + const newestHoursAgo = Math.round( + (Date.now() - new Date(group.newest).getTime()) / 3_600_000, + ) + const ageLabel = + oldestHoursAgo === newestHoursAgo + ? formatHoursAgo(oldestHoursAgo) + : `${formatHoursAgo(newestHoursAgo)} – ${formatHoursAgo(oldestHoursAgo)}` + + lines.push( + `• *${group.count}x* \`${group.errorMessage}\``, + ` _Location:_ \`${group.location}\` | _retries avg/max:_ ${group.avgRetries}/${group.maxRetries}${group.platforms ? ` | _platforms:_ \`${group.platforms}\`` : ''}`, + ` _Age:_ ${ageLabel}`, + ` _Detail:_ ${group.message}`, + '', + ) + } + + sections.push({ + title: `Error Breakdown (top ${errorGroups.length})`, + text: lines.join('\n'), + }) + } + + const persona = errorCount > 0 ? SlackPersona.WARNING_PROPAGATOR : SlackPersona.INFO_NOTIFIER + + await sendSlackNotificationAsync( + SlackChannel.CDP_INTEGRATIONS_ALERTS, + persona, + 'Integration Results Daily Report', + sections, + ) + + ctx.log.info( + `Integration results report sent: pending=${pending}, delayed=${delayed} (${overdueDelayed} overdue), errors=${errorCount}`, + ) + }, +} + +function formatHoursAgo(hours: number): string { + if (hours < 1) return 'just now' + if (hours < 24) return `${hours}h ago` + return `${Math.round(hours / 24)}d ago` +} + +export default job diff --git a/services/apps/cron_service/src/jobs/queueMonitoring.job.ts b/services/apps/cron_service/src/jobs/queueMonitoring.job.ts index 5ddc20f4d3..ff1e02a44c 100644 --- a/services/apps/cron_service/src/jobs/queueMonitoring.job.ts +++ b/services/apps/cron_service/src/jobs/queueMonitoring.job.ts @@ -2,7 +2,7 @@ import CronTime from 'cron-time-generator' import { IS_PROD_ENV, distinct, timeout } from '@crowd/common' import { Logger } from '@crowd/logging' -import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue' +import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue' import { SlackChannel, SlackPersona, sendSlackNotificationAsync } from '@crowd/slack' import telemetry from '@crowd/telemetry' @@ -41,7 +41,7 @@ const job: IJobDefinition = { } for (const group of groups) { - const counts = await getMessageCounts(ctx.log, admin, topic, group) + const counts = await getKafkaMessageCounts(ctx.log, admin, topic, group) ctx.log.info( `Topic ${topic} group ${group} has ${counts.total} total messages, ${counts.consumed} consumed, ${counts.unconsumed} unconsumed!`, ) @@ -181,61 +181,6 @@ async function isConsumerListeningToTopic( } } -async function getMessageCounts( - log: Logger, - admin: KafkaAdmin, - topic: string, - groupId: string, -): Promise<{ - total: number - consumed: number - unconsumed: number -}> { - try { - const topicOffsets = await admin.fetchTopicOffsets(topic) - const offsetsResponse = await admin.fetchOffsets({ - groupId: groupId, - topics: [topic], - }) - - const offsets = offsetsResponse[0].partitions - - let totalMessages = 0 - let consumedMessages = 0 - let totalLeft = 0 - - for (const offset of offsets) { - const topicOffset = topicOffsets.find((p) => p.partition === offset.partition) - if (topicOffset) { - // Total messages is the latest offset - totalMessages += Number(topicOffset.offset) - - // Handle -1 offsets (no committed offset) - if (offset.offset === '-1') { - // No committed offset means no messages consumed from this partition - consumedMessages += 0 - // Unconsumed is the total messages in the partition - totalLeft += Number(topicOffset.offset) - Number(topicOffset.low) - } else { - // Consumed messages is the consumer group's offset - consumedMessages += Number(offset.offset) - // Unconsumed is the difference - totalLeft += Number(topicOffset.offset) - Number(offset.offset) - } - } - } - - return { - total: totalMessages, - consumed: consumedMessages, - unconsumed: totalLeft, - } - } catch (err) { - log.error(err, 'Failed to get message count!') - throw err - } -} - async function getTopicMessageCount( log: Logger, admin: KafkaAdmin, diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 6435ea1362..2c87ad2ea5 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -266,13 +266,25 @@ export default class ActivityService extends LoggerBase { const results = new Map() for (const { resultId, activity, platform } of data) { + // Guard against results whose data.data is missing or not an activity object. + // Without the !activity check, accessing activity.username throws a TypeError that + // propagates out of prepareMemberData and crashes the entire batch, marking all other + // results in the batch with the same error even though they are valid. + if (!activity) { + this.log.error({ platform }, 'Activity data is missing.') + results.set(resultId, { + success: false, + err: new UnrepeatableError('Activity data is missing.'), + }) + continue + } + if (!activity.username && !activity.member) { this.log.error({ platform, activity }, 'Activity does not have a username or member.') results.set(resultId, { success: false, err: new UnrepeatableError('Activity does not have a username or member.'), }) - continue } diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 29cfb23b2a..54fcd50dfa 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -97,7 +97,14 @@ export default class DataSinkService extends LoggerBase { }` } - if (errorData.errorMessage.includes('uix_memberIdentities_platform_value_type_verified')) { + // Identity conflict errors get a long random delay to let concurrent upserts settle. + // Previously this called delayResult() unconditionally, bypassing maxStreamRetries and + // allowing retries to grow without bound. Now we respect the retry limit so the row + // eventually reaches ERROR state instead of cycling forever. + if ( + errorData.errorMessage.includes('uix_memberIdentities_platform_value_type_verified') && + resultInfo.retries + 1 <= WORKER_SETTINGS().maxStreamRetries + ) { const delaySeconds = Math.floor(Math.random() * (120 - 10 + 1) + 10) * 60 const until = addSeconds(new Date(), delaySeconds) this.log.warn( diff --git a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts index d9028e20a9..c65e706b81 100644 --- a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts +++ b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts @@ -374,7 +374,7 @@ export default class IntegrationStreamService extends LoggerBase { await integrationService.processWebhookStream(context) this.log.debug('Finished processing webhook stream!') await this.repo.deleteStream(streamId) - await this.webhookRepo.markWebhookProcessed(webhookId) + await this.webhookRepo.deleteWebhook(webhookId) return true } catch (err) { this.log.error(err, 'Error while processing webhook stream!') diff --git a/services/libs/common_services/src/services/emitters/integrationStreamWorker.emitter.ts b/services/libs/common_services/src/services/emitters/integrationStreamWorker.emitter.ts index 6efa6864f3..1697f8cdab 100644 --- a/services/libs/common_services/src/services/emitters/integrationStreamWorker.emitter.ts +++ b/services/libs/common_services/src/services/emitters/integrationStreamWorker.emitter.ts @@ -1,4 +1,4 @@ -import { generateUUIDv1 } from '@crowd/common' +import { generateUUIDv1, partition } from '@crowd/common' import { Logger } from '@crowd/logging' import { CrowdQueue, IQueue } from '@crowd/queue' import { @@ -59,4 +59,25 @@ export class IntegrationStreamWorkerEmitter extends QueuePriorityService { { onboarding: true }, ) } + + // Sends in parallel batches of 10. Uses concurrent sendMessage calls (not sendMessages) + // so the priority context (onboarding flag) is respected — sendMessages() has no way to + // pass a priority override and always routes to NORMAL. + public async triggerWebhookProcessingBatch( + webhookIds: string[], + onboarding: boolean, + ): Promise { + for (const batch of partition(webhookIds, 10)) { + await Promise.all( + batch.map((webhookId) => + this.sendMessage( + generateUUIDv1(), + new ProcessWebhookStreamQueueMessage(webhookId), + webhookId, + { onboarding }, + ), + ), + ) + } + } } diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index eb95e10db5..11951aedc5 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -62,17 +62,19 @@ export default class DataSinkRepository extends RepositoryBase $(lastId)' : ''} order by r.id limit ${limit}; `, + // ERROR rows are intentionally excluded: ERROR is a terminal state — a row reaches it + // only after exhausting all retries or hitting an unrepeatable error. Re-queuing it + // without resetting retries means it will immediately error again on the next attempt. + // To retry a specific error row, reset it explicitly first (state = PENDING, retries = 0). { pendingState: IntegrationResultState.PENDING, delayedState: IntegrationResultState.DELAYED, - errorState: IntegrationResultState.ERROR, lastId, }, ) @@ -90,15 +92,14 @@ export default class DataSinkRepository extends RepositoryBase { - return ( - r.type === identity.type && - r.value === identity.value && - r.platform === identity.platform - ) - }) - - if (row) { - results.set(identity, true) - } else { - results.set(identity, false) - } + // Email is platform-agnostic: the SQL query also omits the platform filter for EMAIL. + // Using find (not singleOrDefault) because case-insensitive matching can return + // multiple rows for the same email stored with different casing. + const row = + data.find( + (r) => + r.type === identity.type && r.value.toLowerCase() === identity.value.toLowerCase(), + ) ?? null + + results.set(identity, row !== null) } else { - const row = singleOrDefault(data, (r) => { - return r.type === identity.type && r.value === identity.value - }) - - if (row) { - results.set(identity, true) - } else { - results.set(identity, false) - } + // The SQL query filters non-EMAIL identities by platform, so the in-memory filter + // must also include platform. Without it, two identities sharing (type, value) but + // on different platforms would both match. The previous singleOrDefault call threw + // "Array contains more than one matching element!" in that case — a deterministic + // crash that never self-heals. + const row = + data.find( + (r) => + r.type === identity.type && + r.value.toLowerCase() === identity.value.toLowerCase() && + r.platform === identity.platform, + ) ?? null + + results.set(identity, row !== null) } } diff --git a/services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts b/services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts index 5a6c2e3d1e..deea18d7dd 100644 --- a/services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts @@ -35,29 +35,10 @@ export default class IncomingWebhookRepository extends RepositoryBase { - await this.db().none( - ` - update "incomingWebhooks" - set - state = $(state), - error = null, - "processedAt" = now() - where id = $(id) - `, - { - id, - state: WebhookState.PROCESSED, - }, - ) - } - public async deleteWebhook(id: string): Promise { - const result = await this.db().result(`delete from "incomingWebhooks" where id = $(id)`, { + await this.db().none(`delete from "incomingWebhooks" where id = $(id)`, { id, }) - - this.checkUpdateRowCount(result.rowCount, 1) } public async markWebhookPending(id: string): Promise { diff --git a/services/libs/queue/src/index.ts b/services/libs/queue/src/index.ts index 30a3040023..f3187ce0b6 100644 --- a/services/libs/queue/src/index.ts +++ b/services/libs/queue/src/index.ts @@ -1,5 +1,6 @@ export * from './types' export * from './queue' export * from './vendors/kafka/config' +export * from './vendors/kafka/utils' export * from './prioritization' export * from './factory' diff --git a/services/libs/queue/src/vendors/kafka/utils.ts b/services/libs/queue/src/vendors/kafka/utils.ts new file mode 100644 index 0000000000..463150ddd7 --- /dev/null +++ b/services/libs/queue/src/vendors/kafka/utils.ts @@ -0,0 +1,51 @@ +import { Admin } from 'kafkajs' + +import { Logger } from '@crowd/logging' + +/** + * Returns the total, consumed, and unconsumed message counts for a Kafka topic/consumer-group pair. + * + * Handles the `-1` offset edge case: when a consumer group has never committed an offset, + * Kafka returns `-1`. In that case all messages from the low watermark onward are unconsumed. + */ +export async function getKafkaMessageCounts( + log: Logger, + admin: Admin, + topic: string, + groupId: string, +): Promise<{ total: number; consumed: number; unconsumed: number }> { + try { + const topicOffsets = await admin.fetchTopicOffsets(topic) + const offsetsResponse = await admin.fetchOffsets({ + groupId, + topics: [topic], + }) + + const offsets = offsetsResponse[0].partitions + + let totalMessages = 0 + let consumedMessages = 0 + let totalLeft = 0 + + for (const offset of offsets) { + const topicOffset = topicOffsets.find((p) => p.partition === offset.partition) + if (topicOffset) { + totalMessages += Number(topicOffset.offset) + + if (offset.offset === '-1') { + // No committed offset yet — treat all messages from the low watermark as unconsumed. + consumedMessages += 0 + totalLeft += Number(topicOffset.offset) - Number(topicOffset.low) + } else { + consumedMessages += Number(offset.offset) + totalLeft += Number(topicOffset.offset) - Number(offset.offset) + } + } + } + + return { total: totalMessages, consumed: consumedMessages, unconsumed: totalLeft } + } catch (err) { + log.error(err, 'Failed to get Kafka message counts!') + throw err + } +}