Skip to content

kafka-topic-delete#389

Open
MallanagoudaB wants to merge 1 commit intomasterfrom
KafkaTopicDelete
Open

kafka-topic-delete#389
MallanagoudaB wants to merge 1 commit intomasterfrom
KafkaTopicDelete

Conversation

@MallanagoudaB
Copy link
Collaborator

@MallanagoudaB MallanagoudaB commented Mar 5, 2026

Summary by CodeRabbit

  • New Features

    • Kafka health check now includes automatic topic cleanup after successful checks, improving resource management.
  • Improvements

    • Enhanced health check flow with updated Kafka integration and streamlined topic lifecycle management.
  • Chores

    • Version bumped to 0.0.9.

@coderabbitai
Copy link

coderabbitai bot commented Mar 5, 2026

📝 Walkthrough

Walkthrough

Bumps health-check version from 0.0.8 to 0.0.9 and adds KafkaJS-based topic deletion to the Kafka health check service. Updates topic lifecycle management including creation logging, payload construction format, and consumer initialization, with topic cleanup added to the post-check workflow.

Changes

Cohort / File(s) Summary
Package Metadata
health-check/package.json
Version bump from 0.0.8 to 0.0.9
Kafka Service Enhancement
health-check/services/kafka.js
Adds KafkaJS-based deleteTopic function for topic cleanup after health checks. Updates ensureTopicExists with improved logging, changes producer.send payload to nested object format, modernizes consumer constructor, and integrates topic deletion into cleanup workflow.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • kafka-fix #364: Modifies health-check/services/kafka.js topic lifecycle and KafkaJS integration patterns similar to this PR.
  • kafka-topic-change #341: Updates health-check/services/kafka.js with dynamic topic name handling and topic lifecycle management complementary to these changes.

Suggested reviewers

  • VISHNUDAS-tunerlabs

Poem

🐰 A rabbit hops through Kafka streams so bright,
Topics dance and cleanup shines just right,
Version bumped with care, the version does grow,
Deletion flows clean—watch the messages go! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'kafka-topic-delete' directly and specifically summarizes the main change: adding Kafka topic deletion functionality via a new deleteTopic function used in cleanup.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch KafkaTopicDelete

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
health-check/services/kafka.js (1)

105-122: ⚠️ Potential issue | 🟠 Major

Cleanup awaits topic deletion, blocking health check resolution.

The cleanup function now awaits deleteTopic() before calling resolve(val). If topic deletion hangs (network issues, broker unresponsive), the health check will never complete. Since health checks are typically time-sensitive, consider making topic deletion fire-and-forget or applying a timeout.

♻️ Option 1: Fire-and-forget deletion (non-blocking)
 const cleanup = async (val) => {
 	if (resolved) return
 	resolved = true
 	try {
 		if (consumer) consumer.close(true)
 		if (client) client.close()
 
 		// Delete topic after successful health check
 		if (val === true) {
-			await deleteTopic(kafkaUrl, uniqueTopicName)
+			// Fire-and-forget: don't block health check result
+			deleteTopic(kafkaUrl, uniqueTopicName).catch(() => {})
 		}
 	} catch (e) {
 		if (DEBUG_MODE) {
 			console.error('[Kafka Health Check] Cleanup error:', e.message)
 		}
 	}
 	resolve(val)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@health-check/services/kafka.js` around lines 105 - 122, The cleanup function
blocks health-check resolution by awaiting deleteTopic; change it to
fire-and-forget so resolve(val) is called immediately: in cleanup (the resolved
guard, consumer.close, client.close logic), remove the await before
deleteTopic(kafkaUrl, uniqueTopicName), instead call deleteTopic(...) without
awaiting and attach a .catch(...) to log errors (use DEBUG_MODE and e.message)
so deletion failures don't block resolve(val); keep resolve(val) as the last
step in cleanup so the health check always finishes promptly.
🧹 Nitpick comments (2)
health-check/services/kafka.js (2)

95-95: Async Promise executor anti-pattern (pre-existing).

new Promise(async (resolve) => {...}) is an anti-pattern—exceptions thrown in the async executor won't reject the outer Promise. While the current code handles errors via try/catch and cleanup(false), consider refactoring to a plain async function in a future cleanup.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@health-check/services/kafka.js` at line 95, The Promise is created with an
async executor (return new Promise(async (resolve) => { ... })), which is an
anti-pattern; remove the async from the executor and either (A) make the
executor non-async and use explicit .then/.catch or try/catch with
Promise-returning calls and call resolve/reject accordingly, or (B) refactor the
surrounding function into an async function that uses await and returns directly
instead of wrapping in new Promise. Locate the promise creation in
health-check/services/kafka.js (the block that calls cleanup(false) and uses
resolve) and implement one of these fixes so thrown errors are properly
propagated and handled.

8-9: Consider consolidating to a single Kafka client library.

The module now uses both kafka-node (for producer/consumer) and kafkajs (for admin). While this works, maintaining two client libraries increases complexity and dependency footprint. If feasible in the future, consider migrating entirely to KafkaJS, which supports all required operations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@health-check/services/kafka.js` around lines 8 - 9, The file currently
requires two clients—const kafka = require('kafka-node') and const { Kafka } =
require('kafkajs')—which should be consolidated; remove the kafka-node require
and refactor any code that uses the kafka variable (producer/consumer creation
and methods) to use KafkaJS equivalents (instantiate new Kafka({...}), then use
kafka.producer(), kafka.consumer(), and kafka.admin()) so the module only
depends on the Kafka class from kafkajs; update variable names and method calls
accordingly to match KafkaJS APIs where producer/consumer/admin are used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@health-check/services/kafka.js`:
- Around line 59-89: The deleteTopic function performs admin.connect(),
admin.deleteTopics(), and admin.disconnect() without timeouts which can hang;
update deleteTopic to enforce timeouts by either passing KafkaJS
connection/request timeout options when constructing the Kafka client (e.g., set
connectionTimeout/requestTimeout) or wrap each admin call (admin.connect,
admin.deleteTopics, admin.disconnect) in a promise-race with a configurable
timeout and ensure admin.disconnect is attempted in finally; reference the
deleteTopic function and the admin.connect/admin.deleteTopics/admin.disconnect
calls when applying the timeout logic and return false if any call times out.

---

Outside diff comments:
In `@health-check/services/kafka.js`:
- Around line 105-122: The cleanup function blocks health-check resolution by
awaiting deleteTopic; change it to fire-and-forget so resolve(val) is called
immediately: in cleanup (the resolved guard, consumer.close, client.close
logic), remove the await before deleteTopic(kafkaUrl, uniqueTopicName), instead
call deleteTopic(...) without awaiting and attach a .catch(...) to log errors
(use DEBUG_MODE and e.message) so deletion failures don't block resolve(val);
keep resolve(val) as the last step in cleanup so the health check always
finishes promptly.

---

Nitpick comments:
In `@health-check/services/kafka.js`:
- Line 95: The Promise is created with an async executor (return new
Promise(async (resolve) => { ... })), which is an anti-pattern; remove the async
from the executor and either (A) make the executor non-async and use explicit
.then/.catch or try/catch with Promise-returning calls and call resolve/reject
accordingly, or (B) refactor the surrounding function into an async function
that uses await and returns directly instead of wrapping in new Promise. Locate
the promise creation in health-check/services/kafka.js (the block that calls
cleanup(false) and uses resolve) and implement one of these fixes so thrown
errors are properly propagated and handled.
- Around line 8-9: The file currently requires two clients—const kafka =
require('kafka-node') and const { Kafka } = require('kafkajs')—which should be
consolidated; remove the kafka-node require and refactor any code that uses the
kafka variable (producer/consumer creation and methods) to use KafkaJS
equivalents (instantiate new Kafka({...}), then use kafka.producer(),
kafka.consumer(), and kafka.admin()) so the module only depends on the Kafka
class from kafkajs; update variable names and method calls accordingly to match
KafkaJS APIs where producer/consumer/admin are used.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9e1198e4-5d7a-4e9f-8c94-2c18577ca3cd

📥 Commits

Reviewing files that changed from the base of the PR and between 76a0fb2 and a0310c0.

📒 Files selected for processing (2)
  • health-check/package.json
  • health-check/services/kafka.js

Comment on lines +59 to +89
async function deleteTopic(kafkaUrl, topicName) {
try {
const kafkaClient = new Kafka({
brokers: [kafkaUrl],
})

const admin = kafkaClient.admin()

await admin.connect()

await admin.deleteTopics({
topics: [topicName],
})

await admin.disconnect()

if (DEBUG_MODE) {
console.log(`[Kafka Health Check] Topic '${topicName}' deleted`)
}

return true
} catch (error) {
if (DEBUG_MODE) {
console.error(
`[Kafka Health Check] Failed to delete topic '${topicName}':`,
error.message
)
}
return false
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Admin operations lack timeout, risking indefinite hang.

The admin.connect(), admin.deleteTopics(), and admin.disconnect() calls have no timeout. If the broker is unresponsive during cleanup, this will block indefinitely, causing the health check to never resolve.

Consider adding a timeout wrapper or using KafkaJS connection timeout options:

🛡️ Proposed fix with timeout
 async function deleteTopic(kafkaUrl, topicName) {
 	try {
 		const kafkaClient = new Kafka({
 			brokers: [kafkaUrl],
+			connectionTimeout: 5000,
+			requestTimeout: 10000,
 		})
 
 		const admin = kafkaClient.admin()
 
 		await admin.connect()
 
 		await admin.deleteTopics({
 			topics: [topicName],
+			timeout: 5000,
 		})
 
 		await admin.disconnect()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function deleteTopic(kafkaUrl, topicName) {
try {
const kafkaClient = new Kafka({
brokers: [kafkaUrl],
})
const admin = kafkaClient.admin()
await admin.connect()
await admin.deleteTopics({
topics: [topicName],
})
await admin.disconnect()
if (DEBUG_MODE) {
console.log(`[Kafka Health Check] Topic '${topicName}' deleted`)
}
return true
} catch (error) {
if (DEBUG_MODE) {
console.error(
`[Kafka Health Check] Failed to delete topic '${topicName}':`,
error.message
)
}
return false
}
}
async function deleteTopic(kafkaUrl, topicName) {
try {
const kafkaClient = new Kafka({
brokers: [kafkaUrl],
connectionTimeout: 5000,
requestTimeout: 10000,
})
const admin = kafkaClient.admin()
await admin.connect()
await admin.deleteTopics({
topics: [topicName],
timeout: 5000,
})
await admin.disconnect()
if (DEBUG_MODE) {
console.log(`[Kafka Health Check] Topic '${topicName}' deleted`)
}
return true
} catch (error) {
if (DEBUG_MODE) {
console.error(
`[Kafka Health Check] Failed to delete topic '${topicName}':`,
error.message
)
}
return false
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@health-check/services/kafka.js` around lines 59 - 89, The deleteTopic
function performs admin.connect(), admin.deleteTopics(), and admin.disconnect()
without timeouts which can hang; update deleteTopic to enforce timeouts by
either passing KafkaJS connection/request timeout options when constructing the
Kafka client (e.g., set connectionTimeout/requestTimeout) or wrap each admin
call (admin.connect, admin.deleteTopics, admin.disconnect) in a promise-race
with a configurable timeout and ensure admin.disconnect is attempted in finally;
reference the deleteTopic function and the
admin.connect/admin.deleteTopics/admin.disconnect calls when applying the
timeout logic and return false if any call times out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant