Add notification replay support for delivery failures#263
Add notification replay support for delivery failures#263sameeragunarathne wants to merge 1 commit intowso2:mainfrom
Conversation
WalkthroughThis pull request introduces a Kafka-backed message persistence layer and refactors the notification delivery system to use a pipeline architecture with replay support. Configuration adds Kafka settings for a notification replay pipeline, and multiple new dependencies (Kafka, messaging, pipeline, Avro, Confluent packages) are added to support the infrastructure. Changes
Sequence Diagram(s)sequenceDiagram
participant Subscriber as Subscription<br/>Processor
participant Pipeline as Notification<br/>Pipeline
participant PrepStage as Prepare Stage<br/>(Build Bundle)
participant DelivStage as Deliver Stage<br/>(HTTP Send)
participant KafkaStore as KafkaMessageStore
participant FailureStore as Failure Topic<br/>Store
participant ReplayStore as Replay Topic<br/>Store
Subscriber->>Pipeline: for each subscription
Pipeline->>PrepStage: pass subscription context
PrepStage->>PrepStage: buildNotificationBundle()
PrepStage->>DelivStage: PreparedNotification
DelivStage->>DelivStage: sendNotificationDirect()
alt Success
DelivStage->>Subscriber: delivery successful
else Delivery Failure
DelivStage->>FailureStore: store failed message
DelivStage->>KafkaStore: persist envelope
opt Replay Enabled
KafkaStore->>ReplayStore: queue for replay
ReplayStore->>Pipeline: retry with backoff
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
fhir-service/notification_ops.bal (1)
47-89: Extract the shared notification preparation step.
sendNotificationDirect()andprepareNotification()both resolve the endpoint, build the bundle, and assemble headers. Pulling that into one helper will keep replay and non-replay delivery from drifting the next time the payload or auth-header logic changes.Also applies to: 120-152
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@fhir-service/notification_ops.bal` around lines 47 - 89, Create a shared helper (e.g., prepareNotificationPayload) that encapsulates the common steps currently duplicated in sendNotificationDirect and prepareNotification: resolve and validate the subscription endpoint (used instead of inline endpoint checks), call buildNotificationBundle(subscription, claimResponseId, claimResponse) to obtain NotificationBundle, and assemble headers including Content-Type and the optional Authorization from extractAuthHeader(subscription); have the helper return the endpoint, bundle, and headers (or an error) and update sendNotificationDirect and prepareNotification to call this helper before constructing the http:Client and posting so both replay and non-replay delivery use the same payload/auth assembly logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@fhir-service/message_store.bal`:
- Around line 89-91: The acknowledge(string id, boolean success = true) function
is currently a no-op but must commit Kafka offsets on successful processing;
update acknowledge to, when success is true, call the Kafka consumer commit API
for the record(s) correlated with id (use your consumer instance used elsewhere
in message_store.bal) so offsets are durably committed, and when success is
false avoid committing (or optionally seek to reprocess) to allow retries; use
the existing consumer variable and its commit/seek methods inside
acknowledge(string id, boolean success = true) rather than returning
immediately.
- Around line 73-86: The retrieve() implementation currently calls
self.consumer->poll(...) and only processes records[0], dropping the rest;
change it to drain the entire batch returned by consumer->poll by iterating over
kafka:AnydataConsumerRecord[] records and decoding each record into
StoredEnvelope (using the existing string/fromJson/cloneWithType logic),
enqueueing or buffering every decoded envelope for later processing, and
returning the first envelope as the current messaging:Message (or otherwise
adjust the function to return a batch if intended); also ensure you
acknowledge/commit or advance offsets for all processed records so none are
re-delivered. Reference symbols: retrieve(), self.consumer->poll(), records,
rec, StoredEnvelope.
In `@fhir-service/notification_ops.bal`:
- Around line 26-33: kafkaServer is being treated as required even when
notificationReplayEnabled is false; make the broker config optional and only
validate/require it when replay is actually enabled by moving the existence
check into the replay-enabled branch. Update the configurable declaration for
kafkaServer to be optional (keep its name kafkaServer) and remove any top-level
validation, then inside the replay startup/initialization code that references
notificationReplayEnabled (the branch that performs replay setup/consumer
creation) check that kafkaServer is present and error if missing before creating
consumers/producers (references: kafkaServer, notificationReplayEnabled,
notificationReplayEnabled branch / replay init code).
- Around line 187-230: Decide and implement a clear contract for
sendNotifications: prefer aggregating delivery failures and returning an error
(since signature already returns error?). To do this, in sendNotifications
collect failures into a list (e.g., errors = []), when pipeline execution yields
pipeline:ExecutionError or sendNotificationDirect returns error, push a
contextual error (include subscription id via sub.id and claimResponseId) into
errors instead of only logging; after the foreach, if errors is non-empty return
a composed error (or the first error with joined messages); alternatively if you
want best-effort keep current behavior but add a doc comment on
sendNotifications clarifying it never fails on individual delivery errors and
keep the logging as-is. Ensure to update uses of execResult, result,
sendNotificationDirect, NotificationDispatch and notificationPipeline handling
accordingly.
---
Nitpick comments:
In `@fhir-service/notification_ops.bal`:
- Around line 47-89: Create a shared helper (e.g., prepareNotificationPayload)
that encapsulates the common steps currently duplicated in
sendNotificationDirect and prepareNotification: resolve and validate the
subscription endpoint (used instead of inline endpoint checks), call
buildNotificationBundle(subscription, claimResponseId, claimResponse) to obtain
NotificationBundle, and assemble headers including Content-Type and the optional
Authorization from extractAuthHeader(subscription); have the helper return the
endpoint, bundle, and headers (or an error) and update sendNotificationDirect
and prepareNotification to call this helper before constructing the http:Client
and posting so both replay and non-replay delivery use the same payload/auth
assembly logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ec42116c-3cc5-40a8-b27f-460aed69490f
📒 Files selected for processing (4)
fhir-service/Config.tomlfhir-service/Dependencies.tomlfhir-service/message_store.balfhir-service/notification_ops.bal
| isolated remote function retrieve() returns messaging:Message|error? { | ||
| kafka:AnydataConsumerRecord[] records = check self.consumer->poll(1); | ||
| if records.length() == 0 { | ||
| return; | ||
| } | ||
|
|
||
| kafka:AnydataConsumerRecord rec = records[0]; | ||
| string raw = check string:fromBytes(<byte[]>rec.value); | ||
| json envelopeJson = check raw.fromJsonString(); | ||
| StoredEnvelope envelope = check envelopeJson.cloneWithType(StoredEnvelope); | ||
| return { | ||
| id: envelope.id, | ||
| payload: envelope.payload | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the repository structure and find relevant files
find . -type f -name "*.bal" | head -20Repository: wso2/reference-implementation-cms0057f
Length of output: 905
🏁 Script executed:
# Look at the message_store.bal file to understand context
cat -n fhir-service/message_store.bal | head -100Repository: wso2/reference-implementation-cms0057f
Length of output: 3999
🏁 Script executed:
# Check if there's any documentation or comments about poll() behavior
rg "poll" -A 2 -B 2 --type balRepository: wso2/reference-implementation-cms0057f
Length of output: 109
🏁 Script executed:
# Search for any Kafka configuration or consumer setup
rg "consumer" --type bal -A 3 -B 1 | head -100Repository: wso2/reference-implementation-cms0057f
Length of output: 109
🏁 Script executed:
# Check ballerinax/kafka dependencies and documentation
find . -type f \( -name "*.md" -o -name "*.toml" -o -name "Ballerina.toml" \) | xargs grep -l kafka 2>/dev/nullRepository: wso2/reference-implementation-cms0057f
Length of output: 236
🏁 Script executed:
# Check Dependencies.toml for Kafka version
cat fhir-service/Dependencies.tomlRepository: wso2/reference-implementation-cms0057f
Length of output: 17814
🏁 Script executed:
# Search for usage patterns of poll() elsewhere in the codebase
find . -name "*.bal" -exec grep -l "->poll" {} \;Repository: wso2/reference-implementation-cms0057f
Length of output: 5187
🏁 Script executed:
# Look for any notification_ops.bal mentioned in the scratchpad
find . -name "notification_ops.bal"Repository: wso2/reference-implementation-cms0057f
Length of output: 117
🏁 Script executed:
# Check how the message store is used to understand the intended behavior
grep -r "retrieve()" --include="*.bal" -B 3 -A 3 | head -100Repository: wso2/reference-implementation-cms0057f
Length of output: 525
🏁 Script executed:
cat -n fhir-service/notification_ops.bal | head -150Repository: wso2/reference-implementation-cms0057f
Length of output: 6263
🌐 Web query:
ballerinax kafka consumer poll method signature 4.6.4 documentation
💡 Result:
In ballerinax/kafka v4.6.4, the kafka:Consumer remote function poll has this signature:
function poll(decimal timeout, typedesc<AnydataConsumerRecord[]> T) returns T|Errortimeout: polling time in secondsT: optional typedesc (defaults to<>) for the required return type (array of consumer records) [1]
Source: Ballerina Central docs for ballerinax/kafka v4.6.4 [1]
[1] (central.ballerina.io)
Citations:
🏁 Script executed:
# Verify how many times retrieve() is called in the codebase to confirm repeated loss
find . -name "*.bal" -type f -exec grep -l "retrieve()" {} \;Repository: wso2/reference-implementation-cms0057f
Length of output: 114
🏁 Script executed:
# Check if there's any buffering or batching logic elsewhere that might mitigate this
grep -n "records\[0\]" fhir-service/message_store.balRepository: wso2/reference-implementation-cms0057f
Length of output: 138
Don't drop records returned by poll().
Line 74 polls Kafka with a 1-second timeout, but lines 79–86 process only records[0]. The poll() method can return multiple batched records; any beyond the first are discarded. In the notification failure and replay pipeline context, this causes permanent loss of notifications that should be retried. Either cap the consumer to one record per poll or buffer and drain the entire batch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@fhir-service/message_store.bal` around lines 73 - 86, The retrieve()
implementation currently calls self.consumer->poll(...) and only processes
records[0], dropping the rest; change it to drain the entire batch returned by
consumer->poll by iterating over kafka:AnydataConsumerRecord[] records and
decoding each record into StoredEnvelope (using the existing
string/fromJson/cloneWithType logic), enqueueing or buffering every decoded
envelope for later processing, and returning the first envelope as the current
messaging:Message (or otherwise adjust the function to return a batch if
intended); also ensure you acknowledge/commit or advance offsets for all
processed records so none are re-delivered. Reference symbols: retrieve(),
self.consumer->poll(), records, rec, StoredEnvelope.
| isolated remote function acknowledge(string id, boolean success = true) returns error? { | ||
| // No-op acknowledge; offsets are managed by the Kafka consumer configuration. | ||
| return; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.bal" | grep -E "(message_store|notification_ops)" | head -20Repository: wso2/reference-implementation-cms0057f
Length of output: 150
🏁 Script executed:
wc -l ./fhir-service/message_store.balRepository: wso2/reference-implementation-cms0057f
Length of output: 117
🏁 Script executed:
cat -n ./fhir-service/message_store.balRepository: wso2/reference-implementation-cms0057f
Length of output: 3999
🏁 Script executed:
wc -l ./fhir-service/notification_ops.balRepository: wso2/reference-implementation-cms0057f
Length of output: 121
🏁 Script executed:
sed -n '92,118p' ./fhir-service/notification_ops.balRepository: wso2/reference-implementation-cms0057f
Length of output: 1204
acknowledge() must commit offsets on successful message processing.
Line 54 disables auto-commit in the Kafka consumer, but acknowledge() is a no-op that ignores the success flag. This means offsets are never committed, so on restart or rebalance, already-handled messages will be redelivered. Since this store is wired into failureStore, replayStore, and deadLetterStore (notification_ops.bal:92-118), the replay pipeline cannot maintain durable state and will reprocess duplicate messages indefinitely.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@fhir-service/message_store.bal` around lines 89 - 91, The acknowledge(string
id, boolean success = true) function is currently a no-op but must commit Kafka
offsets on successful processing; update acknowledge to, when success is true,
call the Kafka consumer commit API for the record(s) correlated with id (use
your consumer instance used elsewhere in message_store.bal) so offsets are
durably committed, and when success is false avoid committing (or optionally
seek to reprocess) to allow retries; use the existing consumer variable and its
commit/seek methods inside acknowledge(string id, boolean success = true) rather
than returning immediately.
| configurable string kafkaServer = ?; | ||
| configurable string notificationFailureTopic = "pas-notification-failure-store"; | ||
| configurable string notificationDeadLetterTopic = "pas-notification-dead-letter-store"; | ||
| configurable string notificationReplayTopic = "pas-notification-replay-store"; | ||
| configurable string notificationStoreConsumerGroup = "pas-notification-store"; | ||
| configurable int notificationReplayMaxRetries = 3; | ||
| configurable decimal notificationReplayRetryInterval = 2; | ||
| configurable boolean notificationReplayEnabled = false; |
There was a problem hiding this comment.
Make Kafka config optional until replay is enabled.
Line 26 makes kafkaServer mandatory even though notificationReplayEnabled defaults to false. That forces existing non-replay deployments to add Kafka config just to keep the old path working. Validate the broker setting only inside the replay-enabled branch instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@fhir-service/notification_ops.bal` around lines 26 - 33, kafkaServer is being
treated as required even when notificationReplayEnabled is false; make the
broker config optional and only validate/require it when replay is actually
enabled by moving the existence check into the replay-enabled branch. Update the
configurable declaration for kafkaServer to be optional (keep its name
kafkaServer) and remove any top-level validation, then inside the replay
startup/initialization code that references notificationReplayEnabled (the
branch that performs replay setup/consumer creation) check that kafkaServer is
present and error if missing before creating consumers/producers (references:
kafkaServer, notificationReplayEnabled, notificationReplayEnabled branch /
replay init code).
| public isolated function sendNotifications( | ||
| fhirClient:FHIRConnector fhirConnector, | ||
| string claimResponseId, | ||
| string organizationId, | ||
| international401:ClaimResponse claimResponse | ||
| ) returns error? { | ||
|
|
||
| log:printInfo(string `Sending notifications for ClaimResponse ${claimResponseId}`); | ||
|
|
||
| // Get active subscriptions | ||
| international401:Subscription[] subscriptions = | ||
| check getActiveSubscriptionsByOrg(fhirConnector, organizationId); | ||
|
|
||
| if subscriptions.length() == 0 { | ||
| log:printWarn(string `No active subscriptions for org ${organizationId}`); | ||
| return; | ||
| } | ||
|
|
||
| // Send notification to each subscription | ||
| foreach international401:Subscription sub in subscriptions { | ||
| pipeline:HandlerChain? maybePipeline = notificationPipeline; | ||
| if maybePipeline is pipeline:HandlerChain { | ||
| pipeline:HandlerChain hc = maybePipeline; | ||
| NotificationDispatch dispatch = { | ||
| subscription: sub, | ||
| claimResponseId, | ||
| claimResponse | ||
| }; | ||
|
|
||
| pipeline:ExecutionSuccess|pipeline:ExecutionError execResult = hc.execute(dispatch); | ||
| if execResult is pipeline:ExecutionError { | ||
| log:printError( | ||
| string `Failed to send notification to ${sub.id ?: "unknown"} (stored for replay)`, | ||
| 'error = execResult | ||
| ); | ||
| } | ||
| } else { | ||
| return error(string `HTTP ${response.statusCode}`); | ||
| error? result = sendNotificationDirect(sub, claimResponseId, claimResponse); | ||
| if result is error { | ||
| log:printError(string `Failed to send notification to ${sub.id ?: "unknown"}: ${result.message()}`); | ||
| } | ||
| } | ||
| } else { | ||
| return error(string `HTTP error: ${response.message()}`); | ||
| } | ||
| } |
There was a problem hiding this comment.
Decide whether sendNotifications() is best-effort or fail-fast.
Lines 216-227 only log delivery failures, so this function returns () even when every notification fails. Either aggregate and return an error, or make the best-effort contract explicit in the API/docs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@fhir-service/notification_ops.bal` around lines 187 - 230, Decide and
implement a clear contract for sendNotifications: prefer aggregating delivery
failures and returning an error (since signature already returns error?). To do
this, in sendNotifications collect failures into a list (e.g., errors = []),
when pipeline execution yields pipeline:ExecutionError or sendNotificationDirect
returns error, push a contextual error (include subscription id via sub.id and
claimResponseId) into errors instead of only logging; after the foreach, if
errors is non-empty return a composed error (or the first error with joined
messages); alternatively if you want best-effort keep current behavior but add a
doc comment on sendNotifications clarifying it never fails on individual
delivery errors and keep the logging as-is. Ensure to update uses of execResult,
result, sendNotificationDirect, NotificationDispatch and notificationPipeline
handling accordingly.
Purpose
Summary by CodeRabbit
Release Notes