Skip to content

Add notification replay support for delivery failures#263

Open
sameeragunarathne wants to merge 1 commit intowso2:mainfrom
sameeragunarathne:feature-replay-notification
Open

Add notification replay support for delivery failures#263
sameeragunarathne wants to merge 1 commit intowso2:mainfrom
sameeragunarathne:feature-replay-notification

Conversation

@sameeragunarathne
Copy link
Copy Markdown
Contributor

@sameeragunarathne sameeragunarathne commented Mar 17, 2026

Purpose

This PR will add support for delivering failure PA notification messages.

Summary by CodeRabbit

Release Notes

  • New Features
    • Added notification replay capability with configurable retry settings for handling failed notifications
    • Implemented persistent message storage to improve notification delivery reliability
    • Introduced optimized notification processing pipeline with preparation and delivery stages
    • Extended configuration options to support replay and dead-letter topic management

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 17, 2026

Walkthrough

This pull request introduces a Kafka-backed message persistence layer and refactors the notification delivery system to use a pipeline architecture with replay support. Configuration adds Kafka settings for a notification replay pipeline, and multiple new dependencies (Kafka, messaging, pipeline, Avro, Confluent packages) are added to support the infrastructure.

Changes

Cohort / File(s) Summary
Configuration
fhir-service/Config.toml
Added [wso2.fhir.service] configuration block with Kafka broker settings, topic names for notifications, failure handling, and replay controls including max retries, retry interval, and enable flag.
Dependencies
fhir-service/Dependencies.toml
Added new packages: ballerina/avro, ballerina/messaging, ballerinax/kafka, ballerinax/confluent.cavroserdes, ballerinax/confluent.cregistry, and xlibb/pipeline. Updated ballerina/health.clients.fhir from 3.0.3 to 3.0.4. Extended service dependencies to include messaging and Kafka packages.
Message Persistence
fhir-service/message_store.bal
New isolated client class KafkaMessageStore implementing messaging:Store interface; persists and retrieves messages via Kafka topics using JSON serialization with UUID-based envelope tracking.
Notification Delivery
fhir-service/notification_ops.bal
Refactored sendNotifications to use pipeline-based delivery stages (prepareNotification, deliverNotification); added support for replay-enabled configuration, failure and dead-letter topic handling, and fallback direct send logic; preserves existing bundle-building and handshake compatibility.

Sequence Diagram(s)

sequenceDiagram
    participant Subscriber as Subscription<br/>Processor
    participant Pipeline as Notification<br/>Pipeline
    participant PrepStage as Prepare Stage<br/>(Build Bundle)
    participant DelivStage as Deliver Stage<br/>(HTTP Send)
    participant KafkaStore as KafkaMessageStore
    participant FailureStore as Failure Topic<br/>Store
    participant ReplayStore as Replay Topic<br/>Store

    Subscriber->>Pipeline: for each subscription
    Pipeline->>PrepStage: pass subscription context
    PrepStage->>PrepStage: buildNotificationBundle()
    PrepStage->>DelivStage: PreparedNotification
    DelivStage->>DelivStage: sendNotificationDirect()
    
    alt Success
        DelivStage->>Subscriber: delivery successful
    else Delivery Failure
        DelivStage->>FailureStore: store failed message
        DelivStage->>KafkaStore: persist envelope
        opt Replay Enabled
            KafkaStore->>ReplayStore: queue for replay
            ReplayStore->>Pipeline: retry with backoff
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰✨ New pipes of notif flow, through Kafka we go,
Messages stored, replayed when they fail,
Pipeline stages dance, giving delivery a chance,
With bundles and envelopes, no message will pale! 📬🎪

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is largely incomplete. Only the 'Purpose' section is partially filled with minimal detail; most required sections (Goals, Approach, User stories, Release note, Documentation, Training, Certification, Marketing, Automation tests, Security checks, Samples, Related PRs, Migrations, Test environment, and Learning) are entirely absent. Expand the description to include Goals, Approach with implementation details, Automation tests with coverage information, Security checks (secure coding standards, FindSecurityBugs, secrets verification), and at minimum Documentation and Test environment sections.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add notification replay support for delivery failures' accurately summarizes the main change—introducing a notification replay pipeline with Kafka-backed message storage and failure handling mechanisms for PAS notifications.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
fhir-service/notification_ops.bal (1)

47-89: Extract the shared notification preparation step.

sendNotificationDirect() and prepareNotification() both resolve the endpoint, build the bundle, and assemble headers. Pulling that into one helper will keep replay and non-replay delivery from drifting the next time the payload or auth-header logic changes.

Also applies to: 120-152

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@fhir-service/notification_ops.bal` around lines 47 - 89, Create a shared
helper (e.g., prepareNotificationPayload) that encapsulates the common steps
currently duplicated in sendNotificationDirect and prepareNotification: resolve
and validate the subscription endpoint (used instead of inline endpoint checks),
call buildNotificationBundle(subscription, claimResponseId, claimResponse) to
obtain NotificationBundle, and assemble headers including Content-Type and the
optional Authorization from extractAuthHeader(subscription); have the helper
return the endpoint, bundle, and headers (or an error) and update
sendNotificationDirect and prepareNotification to call this helper before
constructing the http:Client and posting so both replay and non-replay delivery
use the same payload/auth assembly logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@fhir-service/message_store.bal`:
- Around line 89-91: The acknowledge(string id, boolean success = true) function
is currently a no-op but must commit Kafka offsets on successful processing;
update acknowledge to, when success is true, call the Kafka consumer commit API
for the record(s) correlated with id (use your consumer instance used elsewhere
in message_store.bal) so offsets are durably committed, and when success is
false avoid committing (or optionally seek to reprocess) to allow retries; use
the existing consumer variable and its commit/seek methods inside
acknowledge(string id, boolean success = true) rather than returning
immediately.
- Around line 73-86: The retrieve() implementation currently calls
self.consumer->poll(...) and only processes records[0], dropping the rest;
change it to drain the entire batch returned by consumer->poll by iterating over
kafka:AnydataConsumerRecord[] records and decoding each record into
StoredEnvelope (using the existing string/fromJson/cloneWithType logic),
enqueueing or buffering every decoded envelope for later processing, and
returning the first envelope as the current messaging:Message (or otherwise
adjust the function to return a batch if intended); also ensure you
acknowledge/commit or advance offsets for all processed records so none are
re-delivered. Reference symbols: retrieve(), self.consumer->poll(), records,
rec, StoredEnvelope.

In `@fhir-service/notification_ops.bal`:
- Around line 26-33: kafkaServer is being treated as required even when
notificationReplayEnabled is false; make the broker config optional and only
validate/require it when replay is actually enabled by moving the existence
check into the replay-enabled branch. Update the configurable declaration for
kafkaServer to be optional (keep its name kafkaServer) and remove any top-level
validation, then inside the replay startup/initialization code that references
notificationReplayEnabled (the branch that performs replay setup/consumer
creation) check that kafkaServer is present and error if missing before creating
consumers/producers (references: kafkaServer, notificationReplayEnabled,
notificationReplayEnabled branch / replay init code).
- Around line 187-230: Decide and implement a clear contract for
sendNotifications: prefer aggregating delivery failures and returning an error
(since signature already returns error?). To do this, in sendNotifications
collect failures into a list (e.g., errors = []), when pipeline execution yields
pipeline:ExecutionError or sendNotificationDirect returns error, push a
contextual error (include subscription id via sub.id and claimResponseId) into
errors instead of only logging; after the foreach, if errors is non-empty return
a composed error (or the first error with joined messages); alternatively if you
want best-effort keep current behavior but add a doc comment on
sendNotifications clarifying it never fails on individual delivery errors and
keep the logging as-is. Ensure to update uses of execResult, result,
sendNotificationDirect, NotificationDispatch and notificationPipeline handling
accordingly.

---

Nitpick comments:
In `@fhir-service/notification_ops.bal`:
- Around line 47-89: Create a shared helper (e.g., prepareNotificationPayload)
that encapsulates the common steps currently duplicated in
sendNotificationDirect and prepareNotification: resolve and validate the
subscription endpoint (used instead of inline endpoint checks), call
buildNotificationBundle(subscription, claimResponseId, claimResponse) to obtain
NotificationBundle, and assemble headers including Content-Type and the optional
Authorization from extractAuthHeader(subscription); have the helper return the
endpoint, bundle, and headers (or an error) and update sendNotificationDirect
and prepareNotification to call this helper before constructing the http:Client
and posting so both replay and non-replay delivery use the same payload/auth
assembly logic.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ec42116c-3cc5-40a8-b27f-460aed69490f

📥 Commits

Reviewing files that changed from the base of the PR and between 94fe246 and ed8d584.

📒 Files selected for processing (4)
  • fhir-service/Config.toml
  • fhir-service/Dependencies.toml
  • fhir-service/message_store.bal
  • fhir-service/notification_ops.bal

Comment on lines +73 to +86
isolated remote function retrieve() returns messaging:Message|error? {
kafka:AnydataConsumerRecord[] records = check self.consumer->poll(1);
if records.length() == 0 {
return;
}

kafka:AnydataConsumerRecord rec = records[0];
string raw = check string:fromBytes(<byte[]>rec.value);
json envelopeJson = check raw.fromJsonString();
StoredEnvelope envelope = check envelopeJson.cloneWithType(StoredEnvelope);
return {
id: envelope.id,
payload: envelope.payload
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's explore the repository structure and find relevant files
find . -type f -name "*.bal" | head -20

Repository: wso2/reference-implementation-cms0057f

Length of output: 905


🏁 Script executed:

# Look at the message_store.bal file to understand context
cat -n fhir-service/message_store.bal | head -100

Repository: wso2/reference-implementation-cms0057f

Length of output: 3999


🏁 Script executed:

# Check if there's any documentation or comments about poll() behavior
rg "poll" -A 2 -B 2 --type bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 109


🏁 Script executed:

# Search for any Kafka configuration or consumer setup
rg "consumer" --type bal -A 3 -B 1 | head -100

Repository: wso2/reference-implementation-cms0057f

Length of output: 109


🏁 Script executed:

# Check ballerinax/kafka dependencies and documentation
find . -type f \( -name "*.md" -o -name "*.toml" -o -name "Ballerina.toml" \) | xargs grep -l kafka 2>/dev/null

Repository: wso2/reference-implementation-cms0057f

Length of output: 236


🏁 Script executed:

# Check Dependencies.toml for Kafka version
cat fhir-service/Dependencies.toml

Repository: wso2/reference-implementation-cms0057f

Length of output: 17814


🏁 Script executed:

# Search for usage patterns of poll() elsewhere in the codebase
find . -name "*.bal" -exec grep -l "->poll" {} \;

Repository: wso2/reference-implementation-cms0057f

Length of output: 5187


🏁 Script executed:

# Look for any notification_ops.bal mentioned in the scratchpad
find . -name "notification_ops.bal"

Repository: wso2/reference-implementation-cms0057f

Length of output: 117


🏁 Script executed:

# Check how the message store is used to understand the intended behavior
grep -r "retrieve()" --include="*.bal" -B 3 -A 3 | head -100

Repository: wso2/reference-implementation-cms0057f

Length of output: 525


🏁 Script executed:

cat -n fhir-service/notification_ops.bal | head -150

Repository: wso2/reference-implementation-cms0057f

Length of output: 6263


🌐 Web query:

ballerinax kafka consumer poll method signature 4.6.4 documentation

💡 Result:

In ballerinax/kafka v4.6.4, the kafka:Consumer remote function poll has this signature:

function poll(decimal timeout, typedesc<AnydataConsumerRecord[]> T) returns T|Error
  • timeout: polling time in seconds
  • T: optional typedesc (defaults to <>) for the required return type (array of consumer records) [1]

Source: Ballerina Central docs for ballerinax/kafka v4.6.4 [1]

[1] (central.ballerina.io)

Citations:


🏁 Script executed:

# Verify how many times retrieve() is called in the codebase to confirm repeated loss
find . -name "*.bal" -type f -exec grep -l "retrieve()" {} \;

Repository: wso2/reference-implementation-cms0057f

Length of output: 114


🏁 Script executed:

# Check if there's any buffering or batching logic elsewhere that might mitigate this
grep -n "records\[0\]" fhir-service/message_store.bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 138


Don't drop records returned by poll().

Line 74 polls Kafka with a 1-second timeout, but lines 79–86 process only records[0]. The poll() method can return multiple batched records; any beyond the first are discarded. In the notification failure and replay pipeline context, this causes permanent loss of notifications that should be retried. Either cap the consumer to one record per poll or buffer and drain the entire batch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@fhir-service/message_store.bal` around lines 73 - 86, The retrieve()
implementation currently calls self.consumer->poll(...) and only processes
records[0], dropping the rest; change it to drain the entire batch returned by
consumer->poll by iterating over kafka:AnydataConsumerRecord[] records and
decoding each record into StoredEnvelope (using the existing
string/fromJson/cloneWithType logic), enqueueing or buffering every decoded
envelope for later processing, and returning the first envelope as the current
messaging:Message (or otherwise adjust the function to return a batch if
intended); also ensure you acknowledge/commit or advance offsets for all
processed records so none are re-delivered. Reference symbols: retrieve(),
self.consumer->poll(), records, rec, StoredEnvelope.

Comment on lines +89 to +91
isolated remote function acknowledge(string id, boolean success = true) returns error? {
// No-op acknowledge; offsets are managed by the Kafka consumer configuration.
return;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "*.bal" | grep -E "(message_store|notification_ops)" | head -20

Repository: wso2/reference-implementation-cms0057f

Length of output: 150


🏁 Script executed:

wc -l ./fhir-service/message_store.bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 117


🏁 Script executed:

cat -n ./fhir-service/message_store.bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 3999


🏁 Script executed:

wc -l ./fhir-service/notification_ops.bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 121


🏁 Script executed:

sed -n '92,118p' ./fhir-service/notification_ops.bal

Repository: wso2/reference-implementation-cms0057f

Length of output: 1204


acknowledge() must commit offsets on successful message processing.

Line 54 disables auto-commit in the Kafka consumer, but acknowledge() is a no-op that ignores the success flag. This means offsets are never committed, so on restart or rebalance, already-handled messages will be redelivered. Since this store is wired into failureStore, replayStore, and deadLetterStore (notification_ops.bal:92-118), the replay pipeline cannot maintain durable state and will reprocess duplicate messages indefinitely.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@fhir-service/message_store.bal` around lines 89 - 91, The acknowledge(string
id, boolean success = true) function is currently a no-op but must commit Kafka
offsets on successful processing; update acknowledge to, when success is true,
call the Kafka consumer commit API for the record(s) correlated with id (use
your consumer instance used elsewhere in message_store.bal) so offsets are
durably committed, and when success is false avoid committing (or optionally
seek to reprocess) to allow retries; use the existing consumer variable and its
commit/seek methods inside acknowledge(string id, boolean success = true) rather
than returning immediately.

Comment on lines +26 to +33
configurable string kafkaServer = ?;
configurable string notificationFailureTopic = "pas-notification-failure-store";
configurable string notificationDeadLetterTopic = "pas-notification-dead-letter-store";
configurable string notificationReplayTopic = "pas-notification-replay-store";
configurable string notificationStoreConsumerGroup = "pas-notification-store";
configurable int notificationReplayMaxRetries = 3;
configurable decimal notificationReplayRetryInterval = 2;
configurable boolean notificationReplayEnabled = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make Kafka config optional until replay is enabled.

Line 26 makes kafkaServer mandatory even though notificationReplayEnabled defaults to false. That forces existing non-replay deployments to add Kafka config just to keep the old path working. Validate the broker setting only inside the replay-enabled branch instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@fhir-service/notification_ops.bal` around lines 26 - 33, kafkaServer is being
treated as required even when notificationReplayEnabled is false; make the
broker config optional and only validate/require it when replay is actually
enabled by moving the existence check into the replay-enabled branch. Update the
configurable declaration for kafkaServer to be optional (keep its name
kafkaServer) and remove any top-level validation, then inside the replay
startup/initialization code that references notificationReplayEnabled (the
branch that performs replay setup/consumer creation) check that kafkaServer is
present and error if missing before creating consumers/producers (references:
kafkaServer, notificationReplayEnabled, notificationReplayEnabled branch /
replay init code).

Comment on lines +187 to 230
public isolated function sendNotifications(
fhirClient:FHIRConnector fhirConnector,
string claimResponseId,
string organizationId,
international401:ClaimResponse claimResponse
) returns error? {

log:printInfo(string `Sending notifications for ClaimResponse ${claimResponseId}`);

// Get active subscriptions
international401:Subscription[] subscriptions =
check getActiveSubscriptionsByOrg(fhirConnector, organizationId);

if subscriptions.length() == 0 {
log:printWarn(string `No active subscriptions for org ${organizationId}`);
return;
}

// Send notification to each subscription
foreach international401:Subscription sub in subscriptions {
pipeline:HandlerChain? maybePipeline = notificationPipeline;
if maybePipeline is pipeline:HandlerChain {
pipeline:HandlerChain hc = maybePipeline;
NotificationDispatch dispatch = {
subscription: sub,
claimResponseId,
claimResponse
};

pipeline:ExecutionSuccess|pipeline:ExecutionError execResult = hc.execute(dispatch);
if execResult is pipeline:ExecutionError {
log:printError(
string `Failed to send notification to ${sub.id ?: "unknown"} (stored for replay)`,
'error = execResult
);
}
} else {
return error(string `HTTP ${response.statusCode}`);
error? result = sendNotificationDirect(sub, claimResponseId, claimResponse);
if result is error {
log:printError(string `Failed to send notification to ${sub.id ?: "unknown"}: ${result.message()}`);
}
}
} else {
return error(string `HTTP error: ${response.message()}`);
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Decide whether sendNotifications() is best-effort or fail-fast.

Lines 216-227 only log delivery failures, so this function returns () even when every notification fails. Either aggregate and return an error, or make the best-effort contract explicit in the API/docs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@fhir-service/notification_ops.bal` around lines 187 - 230, Decide and
implement a clear contract for sendNotifications: prefer aggregating delivery
failures and returning an error (since signature already returns error?). To do
this, in sendNotifications collect failures into a list (e.g., errors = []),
when pipeline execution yields pipeline:ExecutionError or sendNotificationDirect
returns error, push a contextual error (include subscription id via sub.id and
claimResponseId) into errors instead of only logging; after the foreach, if
errors is non-empty return a composed error (or the first error with joined
messages); alternatively if you want best-effort keep current behavior but add a
doc comment on sendNotifications clarifying it never fails on individual
delivery errors and keep the logging as-is. Ensure to update uses of execResult,
result, sendNotificationDirect, NotificationDispatch and notificationPipeline
handling accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant