Skip to content

Commit 64a80b4

Browse files
feat(metrics): emit workflow execution and per-block metrics to CloudWatch
1 parent f7811f8 commit 64a80b4

6 files changed

Lines changed: 463 additions & 45 deletions

File tree

apps/sim/app/api/cron/cleanup-stale-executions/route.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
88
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
99
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
1010
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import { workflowMetrics } from '@/lib/monitoring/metrics'
1112

1213
const logger = createLogger('CleanupStaleExecutions')
1314

@@ -32,6 +33,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
3233
executionId: workflowExecutionLogs.executionId,
3334
workflowId: workflowExecutionLogs.workflowId,
3435
startedAt: workflowExecutionLogs.startedAt,
36+
trigger: workflowExecutionLogs.trigger,
3537
})
3638
.from(workflowExecutionLogs)
3739
.where(
@@ -72,6 +74,13 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
7274
staleDurationMinutes,
7375
})
7476

77+
// Crashed workers never reach a LoggingSession completion path, so this
78+
// is the only place these failures can be counted toward the error rate.
79+
workflowMetrics.recordExecutionCompleted({
80+
trigger: execution.trigger,
81+
status: 'failed',
82+
})
83+
7584
cleaned++
7685
} catch (error) {
7786
logger.error(`Failed to clean up execution ${execution.executionId}:`, {

apps/sim/executor/execution/block-executor.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction'
44
import { normalizeStringArray } from '@/lib/core/utils/arrays'
55
import { getBaseUrl } from '@/lib/core/utils/urls'
66
import { compactExecutionPayload } from '@/lib/execution/payloads/serializer'
7+
import { workflowMetrics } from '@/lib/monitoring/metrics'
78
import {
89
containsUserFileWithMetadata,
910
hydrateUserFilesWithBase64,
@@ -239,6 +240,7 @@ export class BlockExecutor {
239240
if (normalizedOutput.childTraceSpans && Array.isArray(normalizedOutput.childTraceSpans)) {
240241
blockLog.childTraceSpans = normalizedOutput.childTraceSpans
241242
}
243+
this.recordBlockMetric(block, true, duration)
242244
}
243245

244246
const { childTraceSpans: _traces, ...outputForState } = normalizedOutput
@@ -284,6 +286,21 @@ export class BlockExecutor {
284286
}
285287
}
286288

289+
private recordBlockMetric(block: SerializedBlock, success: boolean, durationMs: number): void {
290+
const operation = block.config?.params?.operation
291+
workflowMetrics.recordBlockExecuted({
292+
blockType: block.metadata?.id || 'unknown',
293+
// Operation is user-configured; only emit registry-style identifiers so
294+
// dynamic values can't explode CloudWatch dimension cardinality.
295+
operation:
296+
typeof operation === 'string' && /^[a-zA-Z0-9_-]{1,64}$/.test(operation)
297+
? operation
298+
: undefined,
299+
success,
300+
durationMs,
301+
})
302+
}
303+
287304
private buildNodeMetadata(node: DAGNode): WorkflowNodeMetadata {
288305
const metadata = node?.metadata ?? {}
289306
return {
@@ -371,6 +388,7 @@ export class BlockExecutor {
371388
if (ChildWorkflowError.isChildWorkflowError(error) && error.childTraceSpans.length > 0) {
372389
blockLog.childTraceSpans = error.childTraceSpans
373390
}
391+
this.recordBlockMetric(block, false, duration)
374392
}
375393

376394
this.execLogger.error(

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,24 @@ const {
3939
completeWorkflowExecutionMock,
4040
startWorkflowExecutionMock,
4141
loadWorkflowStateForExecutionMock,
42+
recordExecutionStartedMock,
43+
recordExecutionCompletedMock,
44+
recordExecutionPausedMock,
4245
} = vi.hoisted(() => ({
4346
completeWorkflowExecutionMock: vi.fn(),
4447
startWorkflowExecutionMock: vi.fn(),
4548
loadWorkflowStateForExecutionMock: vi.fn(),
49+
recordExecutionStartedMock: vi.fn(),
50+
recordExecutionCompletedMock: vi.fn(),
51+
recordExecutionPausedMock: vi.fn(),
52+
}))
53+
54+
vi.mock('@/lib/monitoring/metrics', () => ({
55+
workflowMetrics: {
56+
recordExecutionStarted: recordExecutionStartedMock,
57+
recordExecutionCompleted: recordExecutionCompletedMock,
58+
recordExecutionPaused: recordExecutionPausedMock,
59+
},
4660
}))
4761

4862
vi.mock('@sim/db', () => ({
@@ -648,3 +662,118 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
648662
expect(combined).toContain('force_failed')
649663
})
650664
})
665+
666+
describe('LoggingSession workflow metrics', () => {
667+
beforeEach(() => {
668+
vi.clearAllMocks()
669+
startWorkflowExecutionMock.mockResolvedValue({})
670+
completeWorkflowExecutionMock.mockResolvedValue({})
671+
loadWorkflowStateForExecutionMock.mockResolvedValue({
672+
blocks: {},
673+
edges: [],
674+
loops: {},
675+
parallels: {},
676+
})
677+
dbMocks.selectLimit.mockResolvedValue([{ status: 'running' }])
678+
dbMocks.execute.mockResolvedValue(undefined)
679+
})
680+
681+
it('emits ExecutionStarted on start and not on resume', async () => {
682+
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
683+
await session.start({ workspaceId: 'ws-1' })
684+
expect(recordExecutionStartedMock).toHaveBeenCalledTimes(1)
685+
expect(recordExecutionStartedMock).toHaveBeenCalledWith({ trigger: 'api' })
686+
687+
recordExecutionStartedMock.mockClear()
688+
const resumeSession = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
689+
await resumeSession.start({ workspaceId: 'ws-1', skipLogCreation: true })
690+
expect(recordExecutionStartedMock).not.toHaveBeenCalled()
691+
})
692+
693+
it('emits a success completion with trigger and duration', async () => {
694+
const session = new LoggingSession('wf-1', 'exec-1', 'webhook', 'req-1')
695+
await session.complete({ totalDurationMs: 500 })
696+
697+
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
698+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
699+
trigger: 'webhook',
700+
status: 'success',
701+
durationMs: 500,
702+
})
703+
})
704+
705+
it('emits a failed completion via completeWithError', async () => {
706+
const session = new LoggingSession('wf-1', 'exec-1', 'schedule', 'req-1')
707+
await session.completeWithError({ totalDurationMs: 250, error: { message: 'boom' } })
708+
709+
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
710+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
711+
trigger: 'schedule',
712+
status: 'failed',
713+
durationMs: 250,
714+
})
715+
})
716+
717+
it('emits a cancelled completion via completeWithCancellation', async () => {
718+
const session = new LoggingSession('wf-1', 'exec-1', 'manual', 'req-1')
719+
await session.completeWithCancellation({ totalDurationMs: 100 })
720+
721+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
722+
trigger: 'manual',
723+
status: 'cancelled',
724+
durationMs: 100,
725+
})
726+
})
727+
728+
it('emits ExecutionPaused (not a completion) on pause, then failed if markAsFailed follows', async () => {
729+
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
730+
await session.completeWithPause({ totalDurationMs: 100 })
731+
732+
expect(recordExecutionPausedMock).toHaveBeenCalledTimes(1)
733+
expect(recordExecutionPausedMock).toHaveBeenCalledWith({ trigger: 'api' })
734+
expect(recordExecutionCompletedMock).not.toHaveBeenCalled()
735+
736+
await session.markAsFailed('pause persistence failed')
737+
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
738+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
739+
trigger: 'api',
740+
status: 'failed',
741+
durationMs: undefined,
742+
})
743+
})
744+
745+
it('does not double-emit when markAsFailed runs after a completed session', async () => {
746+
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
747+
await session.complete({ totalDurationMs: 500 })
748+
await session.markAsFailed('timeout')
749+
750+
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
751+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
752+
trigger: 'api',
753+
status: 'success',
754+
durationMs: 500,
755+
})
756+
})
757+
758+
it('emits exactly one completion when the primary write fails and the fallback succeeds', async () => {
759+
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
760+
completeWorkflowExecutionMock
761+
.mockRejectedValueOnce(new Error('finalize failed'))
762+
.mockResolvedValueOnce({})
763+
764+
await session.safeCompleteWithError({ error: { message: 'boom' } })
765+
766+
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
767+
expect(recordExecutionCompletedMock).toHaveBeenCalledWith(
768+
expect.objectContaining({ trigger: 'api', status: 'failed' })
769+
)
770+
})
771+
772+
it('skips the completion metric when the run was already cancelled elsewhere', async () => {
773+
dbMocks.selectLimit.mockResolvedValue([{ status: 'cancelled' }])
774+
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
775+
await session.completeWithError({ error: { message: 'boom' } })
776+
777+
expect(recordExecutionCompletedMock).not.toHaveBeenCalled()
778+
})
779+
})

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type {
2121
TraceSpan,
2222
WorkflowState,
2323
} from '@/lib/logs/types'
24+
import { type WorkflowExecutionStatus, workflowMetrics } from '@/lib/monitoring/metrics'
2425
import type { SerializableExecutionState } from '@/executor/execution/types'
2526

2627
type TriggerData = Record<string, unknown> & {
@@ -137,6 +138,8 @@ export class LoggingSession {
137138
private completionAttemptFailed = false
138139
private pendingProgressWrites = new Set<Promise<void>>()
139140
private postExecutionPromise: Promise<void> | null = null
141+
/** Guards against double-counting ExecutionCompleted across completion paths */
142+
private completionMetricEmitted = false
140143

141144
constructor(
142145
workflowId: string,
@@ -218,6 +221,12 @@ export class LoggingSession {
218221
}
219222
}
220223

224+
private emitExecutionCompletedMetric(status: WorkflowExecutionStatus, durationMs?: number): void {
225+
if (this.completionMetricEmitted) return
226+
this.completionMetricEmitted = true
227+
workflowMetrics.recordExecutionCompleted({ trigger: this.triggerType, status, durationMs })
228+
}
229+
221230
private async completeExecutionWithFinalization(params: {
222231
endedAt: string
223232
totalDurationMs: number
@@ -325,6 +334,7 @@ export class LoggingSession {
325334
workflowState: this.workflowState,
326335
deploymentVersionId,
327336
})
337+
workflowMetrics.recordExecutionStarted({ trigger: this.triggerType })
328338
} else {
329339
// Resume: no cost reload needed. Billing reconciles from the usage_log
330340
// ledger (pre-pause rows already exist) plus the live cost summary.
@@ -364,6 +374,7 @@ export class LoggingSession {
364374
})
365375

366376
this.completed = true
377+
this.emitExecutionCompletedMetric('success', duration)
367378

368379
if (traceSpans && traceSpans.length > 0) {
369380
try {
@@ -500,6 +511,7 @@ export class LoggingSession {
500511
})
501512

502513
this.completed = true
514+
this.emitExecutionCompletedMetric('failed', Math.max(1, durationMs))
503515

504516
try {
505517
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
@@ -593,6 +605,7 @@ export class LoggingSession {
593605
})
594606

595607
this.completed = true
608+
this.emitExecutionCompletedMetric('cancelled', Math.max(1, durationMs))
596609

597610
try {
598611
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
@@ -688,6 +701,7 @@ export class LoggingSession {
688701
})
689702

690703
this.completed = true
704+
workflowMetrics.recordExecutionPaused({ trigger: this.triggerType })
691705

692706
try {
693707
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
@@ -779,6 +793,7 @@ export class LoggingSession {
779793
workflowState: this.workflowState,
780794
deploymentVersionId,
781795
})
796+
workflowMetrics.recordExecutionStarted({ trigger: this.triggerType })
782797

783798
if (this.requestId) {
784799
logger.debug(
@@ -969,6 +984,7 @@ export class LoggingSession {
969984
this.requestId,
970985
this.workflowId
971986
)
987+
this.emitExecutionCompletedMetric('failed')
972988
}
973989

974990
static async markExecutionAsFailed(
@@ -1056,6 +1072,15 @@ export class LoggingSession {
10561072

10571073
this.completed = true
10581074

1075+
if (params.status === 'pending') {
1076+
workflowMetrics.recordExecutionPaused({ trigger: this.triggerType })
1077+
} else {
1078+
this.emitExecutionCompletedMetric(
1079+
params.status === 'failed' || params.status === 'cancelled' ? params.status : 'success',
1080+
params.totalDurationMs || 0
1081+
)
1082+
}
1083+
10591084
logger.info(
10601085
`[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}`
10611086
)

0 commit comments

Comments
 (0)