Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import CronTime from 'cron-time-generator'

import { IS_PROD_ENV } from '@crowd/common'
import { IntegrationStreamWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { WebhookState } from '@crowd/types'

import { IJobDefinition } from '../types'

const TOPIC = 'integration-stream-worker-high-production'
const GROUP_ID = 'integration-stream-worker-high-production'
const MAX_UNCONSUMED = 50000

const job: IJobDefinition = {
name: 'incoming-webhooks-check',
cronTime: CronTime.everyDay(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const kafkaClient = getKafkaClient(QUEUE_CONFIG())
const admin = kafkaClient.admin()
await admin.connect()

let counts: { total: number; consumed: number; unconsumed: number }
try {
counts = await getKafkaMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)
} finally {
await admin.disconnect()
}

if (counts.unconsumed >= MAX_UNCONSUMED) {
ctx.log.info(
`Integration stream worker queue has ${counts.unconsumed} unconsumed messages, skipping!`,
)
return
}

const dbConnection = await getDbConnection(WRITE_DB_CONFIG())

// Clean up orphaned webhooks whose integration was deleted (hard or soft).
// incomingWebhooks has no FK constraint on integrationId so these accumulate silently.
const deleted = await dbConnection.result(
`
delete from "incomingWebhooks" iw
where not exists (
select 1 from integrations i
where i.id = iw."integrationId"
and i."deletedAt" is null
)
`,
)
if (deleted.rowCount > 0) {
ctx.log.info(
`Deleted ${deleted.rowCount} orphaned webhooks with missing or deleted integrations!`,
)
}

const count = (
await dbConnection.one(
`
select count(*)::int as count
from "incomingWebhooks" iw
join integrations i on iw."integrationId" = i.id and i."deletedAt" is null
where iw.state = $(state)
and iw."createdAt" < now() - interval '1 day'
`,
{ state: WebhookState.PENDING },
)
).count

if (count <= counts.unconsumed) {
ctx.log.info(`All ${count} stuck pending webhooks are already in the queue, skipping!`)
return
}

const webhooks = await dbConnection.any<{ id: string; platform: string }>(
`
select iw.id, i.platform
from "incomingWebhooks" iw
join integrations i on iw."integrationId" = i.id and i."deletedAt" is null
where iw.state = $(state)
and iw."createdAt" < now() - interval '1 day'
order by iw."createdAt" asc
limit 10000
`,
{ state: WebhookState.PENDING },
)

if (webhooks.length === 0) {
ctx.log.info('No stuck pending webhooks found!')
return
}

ctx.log.info(`Found ${webhooks.length} stuck pending webhooks, re-triggering!`)

const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const emitter = new IntegrationStreamWorkerEmitter(queueService, ctx.log)
await emitter.init()

await emitter.triggerWebhookProcessingBatch(
webhooks.map((w) => w.id),
true,
)

ctx.log.info(`Re-triggered ${webhooks.length} stuck pending webhooks in total!`)
},
}

export default job
140 changes: 49 additions & 91 deletions services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import CronTime from 'cron-time-generator'

import { generateUUIDv1, partition } from '@crowd/common'
import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common'
import { DataSinkWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { Logger } from '@crowd/logging'
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { DataSinkWorkerQueueMessageType, IntegrationResultState } from '@crowd/types'

Expand All @@ -14,6 +13,7 @@ const job: IJobDefinition = {
name: 'integration-results-check',
cronTime: CronTime.every(10).minutes(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const topic = 'data-sink-worker-normal-production'
const groupId = 'data-sink-worker-normal-production'
Expand All @@ -22,107 +22,65 @@ const job: IJobDefinition = {
const admin = kafkaClient.admin()
await admin.connect()

const counts = await getMessageCounts(ctx.log, admin, topic, groupId)
try {
const counts = await getKafkaMessageCounts(ctx.log, admin, topic, groupId)

// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
if (counts.unconsumed < 50000) {
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
if (counts.unconsumed < 50000) {
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())

// we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :)
const count = (
await dbConnection.one(
`select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`,
)
).count
// we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :)
const count = (
await dbConnection.one(
`select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`,
)
).count

if (count > counts.unconsumed) {
ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`)
if (count > counts.unconsumed) {
ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`)

const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log)
await dswEmitter.init()
const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log)
await dswEmitter.init()

const resultIds = (
await dbConnection.any(
`select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`,
)
).map((r) => r.id)

let triggered = 0

for (const batch of partition(resultIds, 10)) {
const messages = batch.map((resultId) => {
return {
payload: {
type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT,
resultId,
},
groupId: generateUUIDv1(),
deduplicationId: resultId,
}
})
const resultIds = (
await dbConnection.any(
`select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`,
)
).map((r) => r.id)

let triggered = 0

await dswEmitter.sendMessages(messages)
for (const batch of partition(resultIds, 10)) {
const messages = batch.map((resultId) => {
return {
payload: {
type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT,
resultId,
},
groupId: generateUUIDv1(),
deduplicationId: resultId,
}
})

triggered += batch.length
await dswEmitter.sendMessages(messages)

if (triggered % 1000 === 0) {
ctx.log.info(`Triggered ${triggered} results!`)
triggered += batch.length

if (triggered % 1000 === 0) {
ctx.log.info(`Triggered ${triggered} results!`)
}
}
}

ctx.log.info(`Triggered ${triggered} results in total!`)
ctx.log.info(`Triggered ${triggered} results in total!`)
}
} else {
ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`)
}
} else {
ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`)
} finally {
await admin.disconnect()
}
},
}

async function getMessageCounts(
log: Logger,
admin: KafkaAdmin,
topic: string,
groupId: string,
): Promise<{
total: number
consumed: number
unconsumed: number
}> {
try {
const topicOffsets = await admin.fetchTopicOffsets(topic)
const offsetsResponse = await admin.fetchOffsets({
groupId: groupId,
topics: [topic],
})

const offsets = offsetsResponse[0].partitions

let totalMessages = 0
let consumedMessages = 0
let totalLeft = 0

for (const offset of offsets) {
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
if (topicOffset) {
// Total messages is the latest offset
totalMessages += Number(topicOffset.offset)
// Consumed messages is the consumer group's offset
consumedMessages += Number(offset.offset)
// Unconsumed is the difference
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
}
}

return {
total: totalMessages,
consumed: consumedMessages,
unconsumed: totalLeft,
}
} catch (err) {
log.error(err, 'Failed to get message count!')
throw err
}
}

export default job
Loading
Loading