Skip to content

Commit a8aabdc

Browse files
committed
squash: adding more tests that mock the kafka calls
1 parent 90e5fe1 commit a8aabdc

3 files changed

Lines changed: 120 additions & 1 deletion

File tree

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.test.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@
1616

1717
import { LoggerService } from '@backstage/backend-plugin-api';
1818

19+
import { OrchestratorKafkaServiceOptions } from '../types/kafka';
1920
import { DataIndexService } from './DataIndexService';
2021
import { SonataFlowService } from './SonataFlowService';
2122

23+
jest.mock('node:crypto', () => ({
24+
randomUUID: () => '12345',
25+
}));
26+
2227
describe('SonataFlowService', () => {
2328
let loggerMock: jest.Mocked<LoggerService>;
2429
let dataIndexServiceMock: jest.Mocked<DataIndexService>;
@@ -155,6 +160,116 @@ describe('SonataFlowService', () => {
155160
);
156161
});
157162
});
163+
164+
describe('executeWorkflowAsCloudEvent', () => {
165+
const runErrorTestAsCloudEventNoKafkaImplementation =
166+
async (): Promise<void> => {
167+
try {
168+
await sonataFlowService.executeWorkflowAsCloudEvent({
169+
definitionId,
170+
workflowSource: 'workflowSource',
171+
workflowEventType: 'workflowEventType',
172+
contextAttribute: 'contextAttribute',
173+
});
174+
} catch (error) {
175+
throw error;
176+
}
177+
};
178+
beforeEach(() => {
179+
jest.clearAllMocks();
180+
});
181+
182+
it('should return the an error when no orchestrator kafka config is implemented', async () => {
183+
let result;
184+
try {
185+
await runErrorTestAsCloudEventNoKafkaImplementation();
186+
} catch (error: any) {
187+
result = error;
188+
}
189+
190+
expect(result).toBeDefined();
191+
expect(result.message).toEqual(
192+
'No Orchestrator kafka implementation added',
193+
);
194+
});
195+
it('should return the contextAttributeId on successful send', async () => {
196+
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
197+
clientId: 'kafkaClientId',
198+
brokers: ['localhost:9091'],
199+
};
200+
const sonataFlowServiceWithKafka = new SonataFlowService(
201+
dataIndexServiceMock,
202+
loggerMock,
203+
kafkaServiceOptionsMock,
204+
);
205+
const spy = jest
206+
.spyOn(
207+
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
208+
'producer',
209+
)
210+
.mockImplementation(() => {
211+
return {
212+
connect: jest.fn(),
213+
send: jest.fn(),
214+
disconnect: jest.fn(),
215+
};
216+
});
217+
const result =
218+
await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
219+
definitionId,
220+
workflowSource: 'workflowSource',
221+
workflowEventType: 'workflowEventType',
222+
contextAttribute: 'lockid',
223+
});
224+
expect(spy).toHaveBeenCalled();
225+
expect(result).toBeDefined();
226+
expect(result?.id).toBeDefined();
227+
expect(result?.id).toEqual('12345');
228+
});
229+
230+
it('should error on a bad connection', async () => {
231+
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
232+
clientId: 'kafkaClientId',
233+
brokers: ['localhost:9091'],
234+
};
235+
const sonataFlowServiceWithKafka = new SonataFlowService(
236+
dataIndexServiceMock,
237+
loggerMock,
238+
kafkaServiceOptionsMock,
239+
);
240+
jest
241+
.spyOn(
242+
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
243+
'producer',
244+
)
245+
.mockImplementation(() => {
246+
return {
247+
connect: jest
248+
.fn()
249+
.mockRejectedValue(new Error('Wrong Connection Info')),
250+
send: jest.fn(),
251+
disconnect: jest.fn(),
252+
};
253+
});
254+
let result;
255+
try {
256+
result = await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
257+
definitionId,
258+
workflowSource: 'workflowSource',
259+
workflowEventType: 'workflowEventType',
260+
contextAttribute: 'lockid',
261+
});
262+
} catch (error: any) {
263+
result = error;
264+
}
265+
266+
expect(result).toBeDefined();
267+
expect(result.message).toEqual(
268+
'Error with Kafka client with connection Options: clientId: kafkaClientId and broker: ["localhost:9091"]',
269+
);
270+
});
271+
});
272+
158273
describe('executeWorkflow', () => {
159274
const inputData = { var1: 'value1' };
160275
const urlToFetch = 'http://example.com/workflows/workflow-123';

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ export class SonataFlowService {
5656
}
5757
}
5858

59+
getOrchestratorKafkaImpl() {
60+
return this.orchestratorKafkaImpl;
61+
}
62+
5963
public async fetchWorkflowInfoOnService(args: {
6064
definitionId: string;
6165
serviceUrl: string;

workspaces/orchestrator/plugins/orchestrator-backend/src/types/kafka.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { ConnectionOptions } from 'tls';
1919
export type OrchestratorKafkaServiceOptions = {
2020
clientId: string;
2121
brokers: string[];
22-
logLevel: number;
22+
logLevel?: number;
2323
ssl?: SslConfig;
2424
sasl?: SaslConfig;
2525
};

0 commit comments

Comments
 (0)