fix: data sink worker improvements (CM-1054)#3996
Conversation
…xed a few bugs in dsw Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Pull request overview
This PR improves robustness and operational behavior of the data sink + integration stream workers by preventing certain deterministic crashes, tightening retry/queueing behavior, and adding cron-based monitoring/reporting jobs.
Changes:
- Stop re-queuing terminal
ERRORintegration results and ensure identity-conflict delays respectmaxStreamRetries. - Prevent batch crashes when activity payloads are missing/invalid; fix identity matching to include platform in-memory.
- Add cron jobs for integration results Slack reporting and for re-triggering stuck pending webhooks; gate existing integration-results check to prod.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts | Removes “mark processed” behavior and simplifies webhook deletion. |
| services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts | Adjusts in-memory identity matching logic to include platform. |
| services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts | Excludes ERROR results from “old results to process” queries with rationale. |
| services/apps/integration_stream_worker/src/service/integrationStreamService.ts | Deletes processed webhooks instead of marking them processed. |
| services/apps/data_sink_worker/src/service/dataSink.service.ts | Prevents unbounded retries for identity-conflict delays by respecting retry limit. |
| services/apps/data_sink_worker/src/service/activity.service.ts | Adds guard for missing activity data to avoid crashing whole batches. |
| services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts | New Slack report job summarizing integration results and top error groups. |
| services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts | Runs the job only in prod via enabled. |
| services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts | New prod-only job to re-trigger stuck pending webhooks when queue backlog allows. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts
Outdated
Show resolved
Hide resolved
...-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts
Show resolved
Hide resolved
services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts
Outdated
Show resolved
Hide resolved
services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts
Outdated
Show resolved
Hide resolved
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
...-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts
Outdated
Show resolved
Hide resolved
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
...-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts
Show resolved
Hide resolved
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
| { | ||
| pendingState: IntegrationResultState.PENDING, | ||
| delayedState: IntegrationResultState.DELAYED, | ||
| errorState: IntegrationResultState.ERROR, |
There was a problem hiding this comment.
Modified method getOldResultsToProcess has no callers
Low Severity
The getOldResultsToProcess method (without the ForTenant suffix) was modified to remove the ERROR state filter, but a grep across the entire codebase shows it is never called anywhere — only its definition exists. The sibling method getOldResultsToProcessForTenant is the one actually used (in trigger-results-for-tenant.ts). This is dead code that adds maintenance burden.


Note
Medium Risk
Touches production cron jobs and worker retry/deletion behavior, including deleting
incomingWebhooksrows and changing when results are delayed vs markedERROR, which could impact throughput or data retention if misconfigured. Changes are localized but affect operational reliability paths (Kafka queue depth checks, retries, and cleanup).Overview
Improves operational resilience for integrations and the data sink pipeline. Adds a new prod-only
incoming-webhooks-checkdaily cron job that cleans up orphanedincomingWebhooksrecords and re-triggers stuckPENDINGwebhooks when the integration-stream-worker queue isn’t already backed up.Adds a prod-only daily
integration-results-reportingcron job that posts a Slack summary ofintegration.resultsstate counts and the top grouped errors (with retry/age/platform context).Refactors Kafka queue depth calculations by introducing shared
getKafkaMessageCounts(handling-1offsets) and updating existing cron jobs to use it, plus ensures Kafka admin disconnects viatry/finally.Hardens worker behavior: guards against missing activity payloads to avoid batch crashes, prevents identity-conflict delays from bypassing
maxStreamRetries, stops re-queuing terminalERRORresults in old-result queries, fixes erasure-identity matching to be platform/case-aware, and deletes processed webhooks (repo API simplification + new batch webhook trigger emitter).Written by Cursor Bugbot for commit fd0b6cd. This will update automatically on new commits. Configure here.