diff --git a/packages/kafka/package.json b/packages/kafka/package.json index d9436bb7..e8be27d8 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -2,7 +2,7 @@ "name": "@message-queue-toolkit/kafka", "version": "0.11.4", "engines": { - "node": ">= 22.14.0" + "node": ">= 22.22.0" }, "private": false, "license": "MIT", @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.8.1", "@lokalise/universal-ts-utils": "^4.10.0", - "@platformatic/kafka": "^2.2.3" + "@platformatic/kafka": "^2.3.1" }, "peerDependencies": { "@message-queue-toolkit/core": ">=26.1.1", diff --git a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts index dc62bcab..90a56768 100644 --- a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts @@ -14,7 +14,7 @@ import { type PermissionAdded, TOPICS, } from '../utils/permissionSchemas.ts' -import { createTestContext, type TestContext } from '../utils/testContext.ts' +import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts' import { PermissionBatchConsumer } from './PermissionBatchConsumer.ts' describe('PermissionBatchConsumer', () => { @@ -35,13 +35,7 @@ describe('PermissionBatchConsumer', () => { describe('init - close', () => { beforeEach(async () => { - try { - await testContext.cradle.kafkaAdmin.deleteTopics({ - topics: TOPICS, - }) - } catch (_) { - // Ignore errors if the topic does not exist - } + await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS) }) it('should thrown an error if topics is empty', async () => { diff --git a/packages/kafka/test/consumer/PermissionConsumer.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.spec.ts index 10a756fd..bb2c3502 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.spec.ts @@ -11,7 +11,7 @@ import { type PermissionAdded, TOPICS, } from '../utils/permissionSchemas.ts' -import { createTestContext, type TestContext } from '../utils/testContext.ts' +import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts' import { PermissionConsumer } from './PermissionConsumer.ts' describe('PermissionConsumer', () => { @@ -32,13 +32,7 @@ describe('PermissionConsumer', () => { describe('init - close', () => { beforeEach(async () => { - try { - await testContext.cradle.kafkaAdmin.deleteTopics({ - topics: TOPICS, - }) - } catch (_) { - // Ignore errors if the topic does not exist - } + await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS) }) it('should thrown an error if topics is empty', async () => { diff --git a/packages/kafka/test/publisher/PermissionPublisher.spec.ts b/packages/kafka/test/publisher/PermissionPublisher.spec.ts index 790cf51a..3b445446 100644 --- a/packages/kafka/test/publisher/PermissionPublisher.spec.ts +++ b/packages/kafka/test/publisher/PermissionPublisher.spec.ts @@ -7,7 +7,7 @@ import { type PermissionRemoved, TOPICS, } from '../utils/permissionSchemas.ts' -import { createTestContext, type TestContext } from '../utils/testContext.ts' +import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts' import { PermissionPublisher } from './PermissionPublisher.ts' describe('PermissionPublisher', () => { @@ -28,13 +28,7 @@ describe('PermissionPublisher', () => { describe('init - close', () => { beforeEach(async () => { - try { - await testContext.cradle.kafkaAdmin.deleteTopics({ - topics: TOPICS, - }) - } catch (_) { - // Ignore errors if the topic does not exist - } + await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS) }) it('should thrown an error if topics is empty', () => { diff --git a/packages/kafka/test/utils/testContext.ts b/packages/kafka/test/utils/testContext.ts index 8b3f34fb..f35187a7 100644 --- a/packages/kafka/test/utils/testContext.ts +++ b/packages/kafka/test/utils/testContext.ts @@ -55,6 +55,23 @@ export const getKafkaConfig = (): KafkaConfig => ({ clientId: randomUUID(), }) +/** + * Deletes only the topics that currently exist. + * + * Workaround for a regression in @platformatic/kafka >= 2.3.0 where Admin#deleteTopics + * leaves a stale entry in its internal deduplication cache when the operation errors + * (e.g. deleting a non-existent topic). Reusing the same Admin instance afterwards causes + * subsequent deleteTopics calls for the same topics to hang. Filtering to existing topics + * avoids triggering that error path. + */ +export const deleteExistingTopics = async (kafkaAdmin: Admin, topics: string[]): Promise => { + const existingTopics = await kafkaAdmin.listTopics() + const topicsToDelete = topics.filter((topic) => existingTopics.includes(topic)) + if (topicsToDelete.length === 0) return + + await kafkaAdmin.deleteTopics({ topics: topicsToDelete }) +} + const resolveDIConfig = (awilixManager: AwilixManager): DiConfig => ({ awilixManager: asFunction(() => awilixManager, SINGLETON_CONFIG), kafkaConfig: asFunction(getKafkaConfig, SINGLETON_CONFIG), diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2f07125f..215641aa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -231,8 +231,8 @@ importers: specifier: ^4.10.0 version: 4.10.0 '@platformatic/kafka': - specifier: ^2.2.3 - version: 2.2.3 + specifier: ^2.3.1 + version: 2.3.1 devDependencies: '@biomejs/biome': specifier: ^2.4.16 @@ -1401,8 +1401,8 @@ packages: resolution: {integrity: sha512-xQD5JSkQc4D15suOk7FezGDmh6Vb5e4phDoFZ/ASpa3Ft+3fOVwFOENmHtC1561zm/xHnKwI91NbMF8inpbk0Q==} engines: {node: '>= 22.19.0'} - '@platformatic/kafka@2.2.3': - resolution: {integrity: sha512-gtBlwncPkj6rvE7Qnq68rWe33gm2gObvV1hgwI3rIRn8h4FTCZIeNZzy30Mg6HB80It/7+OjE2yztp7G1KqcrQ==} + '@platformatic/kafka@2.3.1': + resolution: {integrity: sha512-UHMHdsg19O2GwQKG0EpB3mwwodEVRE0M3D8wTyscc7N//zSwS9aHjMtvOJCCEJk60v6OqTfQ7WRgbeV/3cg0qQ==} engines: {node: '>= 22.22.0 || >= 24.6.0'} '@platformatic/wasm-utils@0.2.1': @@ -4336,7 +4336,7 @@ snapshots: '@platformatic/dynamic-buffer@0.3.1': {} - '@platformatic/kafka@2.2.3': + '@platformatic/kafka@2.3.1': dependencies: '@platformatic/dynamic-buffer': 0.3.1 '@platformatic/wasm-utils': 0.2.1