Conversation
📝 WalkthroughWalkthroughBumps health-check version from 0.0.8 to 0.0.9 and adds KafkaJS-based topic deletion to the Kafka health check service. Updates topic lifecycle management including creation logging, payload construction format, and consumer initialization, with topic cleanup added to the post-check workflow. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
health-check/services/kafka.js (1)
105-122:⚠️ Potential issue | 🟠 MajorCleanup awaits topic deletion, blocking health check resolution.
The
cleanupfunction now awaitsdeleteTopic()before callingresolve(val). If topic deletion hangs (network issues, broker unresponsive), the health check will never complete. Since health checks are typically time-sensitive, consider making topic deletion fire-and-forget or applying a timeout.♻️ Option 1: Fire-and-forget deletion (non-blocking)
const cleanup = async (val) => { if (resolved) return resolved = true try { if (consumer) consumer.close(true) if (client) client.close() // Delete topic after successful health check if (val === true) { - await deleteTopic(kafkaUrl, uniqueTopicName) + // Fire-and-forget: don't block health check result + deleteTopic(kafkaUrl, uniqueTopicName).catch(() => {}) } } catch (e) { if (DEBUG_MODE) { console.error('[Kafka Health Check] Cleanup error:', e.message) } } resolve(val) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@health-check/services/kafka.js` around lines 105 - 122, The cleanup function blocks health-check resolution by awaiting deleteTopic; change it to fire-and-forget so resolve(val) is called immediately: in cleanup (the resolved guard, consumer.close, client.close logic), remove the await before deleteTopic(kafkaUrl, uniqueTopicName), instead call deleteTopic(...) without awaiting and attach a .catch(...) to log errors (use DEBUG_MODE and e.message) so deletion failures don't block resolve(val); keep resolve(val) as the last step in cleanup so the health check always finishes promptly.
🧹 Nitpick comments (2)
health-check/services/kafka.js (2)
95-95: Async Promise executor anti-pattern (pre-existing).
new Promise(async (resolve) => {...})is an anti-pattern—exceptions thrown in the async executor won't reject the outer Promise. While the current code handles errors via try/catch andcleanup(false), consider refactoring to a plain async function in a future cleanup.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@health-check/services/kafka.js` at line 95, The Promise is created with an async executor (return new Promise(async (resolve) => { ... })), which is an anti-pattern; remove the async from the executor and either (A) make the executor non-async and use explicit .then/.catch or try/catch with Promise-returning calls and call resolve/reject accordingly, or (B) refactor the surrounding function into an async function that uses await and returns directly instead of wrapping in new Promise. Locate the promise creation in health-check/services/kafka.js (the block that calls cleanup(false) and uses resolve) and implement one of these fixes so thrown errors are properly propagated and handled.
8-9: Consider consolidating to a single Kafka client library.The module now uses both
kafka-node(for producer/consumer) andkafkajs(for admin). While this works, maintaining two client libraries increases complexity and dependency footprint. If feasible in the future, consider migrating entirely to KafkaJS, which supports all required operations.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@health-check/services/kafka.js` around lines 8 - 9, The file currently requires two clients—const kafka = require('kafka-node') and const { Kafka } = require('kafkajs')—which should be consolidated; remove the kafka-node require and refactor any code that uses the kafka variable (producer/consumer creation and methods) to use KafkaJS equivalents (instantiate new Kafka({...}), then use kafka.producer(), kafka.consumer(), and kafka.admin()) so the module only depends on the Kafka class from kafkajs; update variable names and method calls accordingly to match KafkaJS APIs where producer/consumer/admin are used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@health-check/services/kafka.js`:
- Around line 59-89: The deleteTopic function performs admin.connect(),
admin.deleteTopics(), and admin.disconnect() without timeouts which can hang;
update deleteTopic to enforce timeouts by either passing KafkaJS
connection/request timeout options when constructing the Kafka client (e.g., set
connectionTimeout/requestTimeout) or wrap each admin call (admin.connect,
admin.deleteTopics, admin.disconnect) in a promise-race with a configurable
timeout and ensure admin.disconnect is attempted in finally; reference the
deleteTopic function and the admin.connect/admin.deleteTopics/admin.disconnect
calls when applying the timeout logic and return false if any call times out.
---
Outside diff comments:
In `@health-check/services/kafka.js`:
- Around line 105-122: The cleanup function blocks health-check resolution by
awaiting deleteTopic; change it to fire-and-forget so resolve(val) is called
immediately: in cleanup (the resolved guard, consumer.close, client.close
logic), remove the await before deleteTopic(kafkaUrl, uniqueTopicName), instead
call deleteTopic(...) without awaiting and attach a .catch(...) to log errors
(use DEBUG_MODE and e.message) so deletion failures don't block resolve(val);
keep resolve(val) as the last step in cleanup so the health check always
finishes promptly.
---
Nitpick comments:
In `@health-check/services/kafka.js`:
- Line 95: The Promise is created with an async executor (return new
Promise(async (resolve) => { ... })), which is an anti-pattern; remove the async
from the executor and either (A) make the executor non-async and use explicit
.then/.catch or try/catch with Promise-returning calls and call resolve/reject
accordingly, or (B) refactor the surrounding function into an async function
that uses await and returns directly instead of wrapping in new Promise. Locate
the promise creation in health-check/services/kafka.js (the block that calls
cleanup(false) and uses resolve) and implement one of these fixes so thrown
errors are properly propagated and handled.
- Around line 8-9: The file currently requires two clients—const kafka =
require('kafka-node') and const { Kafka } = require('kafkajs')—which should be
consolidated; remove the kafka-node require and refactor any code that uses the
kafka variable (producer/consumer creation and methods) to use KafkaJS
equivalents (instantiate new Kafka({...}), then use kafka.producer(),
kafka.consumer(), and kafka.admin()) so the module only depends on the Kafka
class from kafkajs; update variable names and method calls accordingly to match
KafkaJS APIs where producer/consumer/admin are used.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9e1198e4-5d7a-4e9f-8c94-2c18577ca3cd
📒 Files selected for processing (2)
health-check/package.jsonhealth-check/services/kafka.js
| async function deleteTopic(kafkaUrl, topicName) { | ||
| try { | ||
| const kafkaClient = new Kafka({ | ||
| brokers: [kafkaUrl], | ||
| }) | ||
|
|
||
| const admin = kafkaClient.admin() | ||
|
|
||
| await admin.connect() | ||
|
|
||
| await admin.deleteTopics({ | ||
| topics: [topicName], | ||
| }) | ||
|
|
||
| await admin.disconnect() | ||
|
|
||
| if (DEBUG_MODE) { | ||
| console.log(`[Kafka Health Check] Topic '${topicName}' deleted`) | ||
| } | ||
|
|
||
| return true | ||
| } catch (error) { | ||
| if (DEBUG_MODE) { | ||
| console.error( | ||
| `[Kafka Health Check] Failed to delete topic '${topicName}':`, | ||
| error.message | ||
| ) | ||
| } | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
Admin operations lack timeout, risking indefinite hang.
The admin.connect(), admin.deleteTopics(), and admin.disconnect() calls have no timeout. If the broker is unresponsive during cleanup, this will block indefinitely, causing the health check to never resolve.
Consider adding a timeout wrapper or using KafkaJS connection timeout options:
🛡️ Proposed fix with timeout
async function deleteTopic(kafkaUrl, topicName) {
try {
const kafkaClient = new Kafka({
brokers: [kafkaUrl],
+ connectionTimeout: 5000,
+ requestTimeout: 10000,
})
const admin = kafkaClient.admin()
await admin.connect()
await admin.deleteTopics({
topics: [topicName],
+ timeout: 5000,
})
await admin.disconnect()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async function deleteTopic(kafkaUrl, topicName) { | |
| try { | |
| const kafkaClient = new Kafka({ | |
| brokers: [kafkaUrl], | |
| }) | |
| const admin = kafkaClient.admin() | |
| await admin.connect() | |
| await admin.deleteTopics({ | |
| topics: [topicName], | |
| }) | |
| await admin.disconnect() | |
| if (DEBUG_MODE) { | |
| console.log(`[Kafka Health Check] Topic '${topicName}' deleted`) | |
| } | |
| return true | |
| } catch (error) { | |
| if (DEBUG_MODE) { | |
| console.error( | |
| `[Kafka Health Check] Failed to delete topic '${topicName}':`, | |
| error.message | |
| ) | |
| } | |
| return false | |
| } | |
| } | |
| async function deleteTopic(kafkaUrl, topicName) { | |
| try { | |
| const kafkaClient = new Kafka({ | |
| brokers: [kafkaUrl], | |
| connectionTimeout: 5000, | |
| requestTimeout: 10000, | |
| }) | |
| const admin = kafkaClient.admin() | |
| await admin.connect() | |
| await admin.deleteTopics({ | |
| topics: [topicName], | |
| timeout: 5000, | |
| }) | |
| await admin.disconnect() | |
| if (DEBUG_MODE) { | |
| console.log(`[Kafka Health Check] Topic '${topicName}' deleted`) | |
| } | |
| return true | |
| } catch (error) { | |
| if (DEBUG_MODE) { | |
| console.error( | |
| `[Kafka Health Check] Failed to delete topic '${topicName}':`, | |
| error.message | |
| ) | |
| } | |
| return false | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@health-check/services/kafka.js` around lines 59 - 89, The deleteTopic
function performs admin.connect(), admin.deleteTopics(), and admin.disconnect()
without timeouts which can hang; update deleteTopic to enforce timeouts by
either passing KafkaJS connection/request timeout options when constructing the
Kafka client (e.g., set connectionTimeout/requestTimeout) or wrap each admin
call (admin.connect, admin.deleteTopics, admin.disconnect) in a promise-race
with a configurable timeout and ensure admin.disconnect is attempted in finally;
reference the deleteTopic function and the
admin.connect/admin.deleteTopics/admin.disconnect calls when applying the
timeout logic and return false if any call times out.
Summary by CodeRabbit
New Features
Improvements
Chores