Skip to content

fix: data sink worker improvements (CM-1054)#3996

Open
themarolt wants to merge 9 commits intomainfrom
fix/data-sink-worker-improvements-CM-1054
Open

fix: data sink worker improvements (CM-1054)#3996
themarolt wants to merge 9 commits intomainfrom
fix/data-sink-worker-improvements-CM-1054

Conversation

@themarolt
Copy link
Copy Markdown
Contributor

@themarolt themarolt commented Apr 3, 2026

Note

Medium Risk
Touches production cron jobs and worker retry/deletion behavior, including deleting incomingWebhooks rows and changing when results are delayed vs marked ERROR, which could impact throughput or data retention if misconfigured. Changes are localized but affect operational reliability paths (Kafka queue depth checks, retries, and cleanup).

Overview
Improves operational resilience for integrations and the data sink pipeline. Adds a new prod-only incoming-webhooks-check daily cron job that cleans up orphaned incomingWebhooks records and re-triggers stuck PENDING webhooks when the integration-stream-worker queue isn’t already backed up.

Adds a prod-only daily integration-results-reporting cron job that posts a Slack summary of integration.results state counts and the top grouped errors (with retry/age/platform context).

Refactors Kafka queue depth calculations by introducing shared getKafkaMessageCounts (handling -1 offsets) and updating existing cron jobs to use it, plus ensures Kafka admin disconnects via try/finally.

Hardens worker behavior: guards against missing activity payloads to avoid batch crashes, prevents identity-conflict delays from bypassing maxStreamRetries, stops re-queuing terminal ERROR results in old-result queries, fixes erasure-identity matching to be platform/case-aware, and deletes processed webhooks (repo API simplification + new batch webhook trigger emitter).

Written by Cursor Bugbot for commit fd0b6cd. This will update automatically on new commits. Configure here.

Copilot AI review requested due to automatic review settings April 3, 2026 07:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves robustness and operational behavior of the data sink + integration stream workers by preventing certain deterministic crashes, tightening retry/queueing behavior, and adding cron-based monitoring/reporting jobs.

Changes:

  • Stop re-queuing terminal ERROR integration results and ensure identity-conflict delays respect maxStreamRetries.
  • Prevent batch crashes when activity payloads are missing/invalid; fix identity matching to include platform in-memory.
  • Add cron jobs for integration results Slack reporting and for re-triggering stuck pending webhooks; gate existing integration-results check to prod.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/libs/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo.ts Removes “mark processed” behavior and simplifies webhook deletion.
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts Adjusts in-memory identity matching logic to include platform.
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts Excludes ERROR results from “old results to process” queries with rationale.
services/apps/integration_stream_worker/src/service/integrationStreamService.ts Deletes processed webhooks instead of marking them processed.
services/apps/data_sink_worker/src/service/dataSink.service.ts Prevents unbounded retries for identity-conflict delays by respecting retry limit.
services/apps/data_sink_worker/src/service/activity.service.ts Adds guard for missing activity data to avoid crashing whole batches.
services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts New Slack report job summarizing integration results and top error groups.
services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts Runs the job only in prod via enabled.
services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts New prod-only job to re-trigger stuck pending webhooks when queue backlog allows.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
@themarolt themarolt requested review from mbani01 and skwowet and removed request for skwowet April 3, 2026 09:31
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

{
pendingState: IntegrationResultState.PENDING,
delayedState: IntegrationResultState.DELAYED,
errorState: IntegrationResultState.ERROR,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified method getOldResultsToProcess has no callers

Low Severity

The getOldResultsToProcess method (without the ForTenant suffix) was modified to remove the ERROR state filter, but a grep across the entire codebase shows it is never called anywhere — only its definition exists. The sibling method getOldResultsToProcessForTenant is the one actually used (in trigger-results-for-tenant.ts). This is dead code that adds maintenance burden.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants