Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/kafka/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@message-queue-toolkit/kafka",
"version": "0.11.4",
"engines": {
"node": ">= 22.14.0"
"node": ">= 22.22.0"

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be compatible with @platformatic/kafka requirement

},
"private": false,
"license": "MIT",
Expand Down Expand Up @@ -53,7 +53,7 @@
"dependencies": {
"@lokalise/node-core": "^14.8.1",
"@lokalise/universal-ts-utils": "^4.10.0",
"@platformatic/kafka": "^2.2.3"
"@platformatic/kafka": "^2.3.1"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=26.1.1",
Expand Down
10 changes: 2 additions & 8 deletions packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
type PermissionAdded,
TOPICS,
} from '../utils/permissionSchemas.ts'
import { createTestContext, type TestContext } from '../utils/testContext.ts'
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
import { PermissionBatchConsumer } from './PermissionBatchConsumer.ts'

describe('PermissionBatchConsumer', () => {
Expand All @@ -35,13 +35,7 @@ describe('PermissionBatchConsumer', () => {

describe('init - close', () => {
beforeEach(async () => {
try {
await testContext.cradle.kafkaAdmin.deleteTopics({
topics: TOPICS,
})
} catch (_) {
// Ignore errors if the topic does not exist
}
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
})

it('should thrown an error if topics is empty', async () => {
Expand Down
10 changes: 2 additions & 8 deletions packages/kafka/test/consumer/PermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
type PermissionAdded,
TOPICS,
} from '../utils/permissionSchemas.ts'
import { createTestContext, type TestContext } from '../utils/testContext.ts'
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
import { PermissionConsumer } from './PermissionConsumer.ts'

describe('PermissionConsumer', () => {
Expand All @@ -32,13 +32,7 @@ describe('PermissionConsumer', () => {

describe('init - close', () => {
beforeEach(async () => {
try {
await testContext.cradle.kafkaAdmin.deleteTopics({
topics: TOPICS,
})
} catch (_) {
// Ignore errors if the topic does not exist
}
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
})

it('should thrown an error if topics is empty', async () => {
Expand Down
10 changes: 2 additions & 8 deletions packages/kafka/test/publisher/PermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
type PermissionRemoved,
TOPICS,
} from '../utils/permissionSchemas.ts'
import { createTestContext, type TestContext } from '../utils/testContext.ts'
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
import { PermissionPublisher } from './PermissionPublisher.ts'

describe('PermissionPublisher', () => {
Expand All @@ -28,13 +28,7 @@ describe('PermissionPublisher', () => {

describe('init - close', () => {
beforeEach(async () => {
try {
await testContext.cradle.kafkaAdmin.deleteTopics({
topics: TOPICS,
})
} catch (_) {
// Ignore errors if the topic does not exist
}
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
})

it('should thrown an error if topics is empty', () => {
Expand Down
17 changes: 17 additions & 0 deletions packages/kafka/test/utils/testContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ export const getKafkaConfig = (): KafkaConfig => ({
clientId: randomUUID(),
})

/**
* Deletes only the topics that currently exist.
*
* Workaround for a regression in @platformatic/kafka >= 2.3.0 where Admin#deleteTopics

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a simple fix for @platformatic/kafka, I will create PR for it. Since we only use topics deletion in tests and the workaround is clean (checking existing topics before deletion makes sense), it should be fine to proceed with the version upgrade

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR for the library: platformatic/kafka#323

* leaves a stale entry in its internal deduplication cache when the operation errors
* (e.g. deleting a non-existent topic). Reusing the same Admin instance afterwards causes
* subsequent deleteTopics calls for the same topics to hang. Filtering to existing topics
* avoids triggering that error path.
*/
export const deleteExistingTopics = async (kafkaAdmin: Admin, topics: string[]): Promise<void> => {
const existingTopics = await kafkaAdmin.listTopics()
const topicsToDelete = topics.filter((topic) => existingTopics.includes(topic))
if (topicsToDelete.length === 0) return

await kafkaAdmin.deleteTopics({ topics: topicsToDelete })
}

const resolveDIConfig = (awilixManager: AwilixManager): DiConfig => ({
awilixManager: asFunction(() => awilixManager, SINGLETON_CONFIG),
kafkaConfig: asFunction(getKafkaConfig, SINGLETON_CONFIG),
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading