feat(logs): [SVLS-8582] Hold logs and add durable context to durable function logs#1053
feat(logs): [SVLS-8582] Hold logs and add durable context to durable function logs#1053
Conversation
When the Lambda runtime is identified as a Durable Function (runtime_version contains "DurableFunction"), the logs processor now: - Maintains an Option<bool> is_durable_function flag that starts as None (unknown) until PlatformInitStart sets it to Some(true/false), with a PlatformStart fallback to Some(false) to avoid holding logs forever. - Holds logs in pending_durable_logs while the flag is None, draining them once the flag is resolved (avoids a race where logs arrive before the PlatformInitStart event). - Maintains a durable_id_map (capacity 5, FIFO eviction) from request_id to (durable_execution_id, durable_execution_name), populated by parsing JSON log messages that carry those fields. - Skips flushing any log whose request_id is not yet in the map; once it is, appends durable_execution_id:<id>,durable_execution_name:<name> to the log's ddtags before queuing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eyed by request_id - pending_durable_logs: Vec<IntakeLog> → held_logs: HashMap<String, Vec<IntakeLog>> Logs are now grouped by request_id, enabling O(1) lookup and targeted draining. - try_update_durable_map now returns bool (true = new entry added). The caller is responsible for draining held_logs when a new entry arrives, so the method stays focused on map maintenance only. - New drain_held_for_request_id: when a request_id's durable context first becomes known, immediately moves all stashed logs for that request_id into ready_logs with the appropriate durable tags, without touching the rest of held_logs. - New resolve_held_logs_on_durable_function_set: called at the exact moment is_durable_function transitions from None to Some(...) — inside PlatformInitStart and the PlatformStart fallback — rather than on every process() call. For Some(false) all held logs are flushed; for Some(true) the held logs are scanned for durable context and eligible ones are moved to ready_logs, ineligible ones remain. - queue_log_after_rules updated: in the None state logs with a request_id go into held_logs[request_id]; in the Some(true) state logs with no durable context yet are also stashed in held_logs[request_id] instead of being dropped. - process() no longer needs to drain pending logs on each call; that is now driven entirely by the state-transition helpers above. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nction PlatformInitStart is essentially guaranteed to arrive before any invocation on a cold start, so the fallback that set is_durable_function = Some(false) on PlatformStart was unnecessary. Update the three unit tests that skipped PlatformInitStart by setting is_durable_function = Some(false) directly (private field is accessible from the in-file test module), keeping each test focused on what it actually exercises rather than the cold-start init lifecycle. Update the integration test to prepend a platform.initStart event and call sync_consume once per event (previously it sent all events first and then drained once, which now works correctly only with the init event). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
0a7b272 to
299e972
Compare
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When an aws.lambda span (resource = dd-tracer-serverless-span) carries durable_function_execution_id and durable_function_execution_name meta tags, forward the context to the logs agent via a dedicated mpsc channel so held logs can be tagged and released. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… of tags Add durable_execution_id and durable_execution_name as fields on the Lambda struct so they appear as lambda.durable_execution_id and lambda.durable_execution_name attributes in the log payload, consistent with how lambda.request_id is represented. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Combine both sides: - Keep durable_context_tx (from this branch) in start_logs_agent return tuple and start_trace_agent parameter - Add shared_client / client parameter (from main) to start_logs_agent call, LogsFlusher::new, and start_trace_agent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…durable runtime
The Lambda durable execution runtime rejects HTTP-style response dicts
({'statusCode': 200}) with "Invalid Status in invocation output." Return
None instead, which the runtime accepts.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Logs emitted through the durable execution SDK already carry an
`executionArn` field. Parse `durable_execution_name` and
`durable_execution_id` directly from the ARN in `get_message()` and
skip the hold/release cycle in `queue_log_after_rules()`, pushing such
logs straight to `ready_logs`.
ARN format:
arn:aws:lambda:{region}:{account}:function:{name}:{version}
/durable-execution/{exec_name}/{exec_id}
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Aligns with lifecycle::invocation::ContextBuffer default capacity of 500, which is sized to absorb async event backlog where invocation contexts may arrive out of order. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Can come in a later PR, but I think we should also add Durable Function Operation Name as an attribute in the logs when it is available. |
@jchrostek-dd For SDK logs, operation name and operation id already exist as log attributes: For non-SDK logs, we are not planning to add them to logs because it’s hard for the extension to get them. |
…g group Lambda durable execution requires Python 3.14+. The runtime version string "python:3.14.DurableFunction.v9" is what causes the extension to set is_durable_function=true. Python 3.13 produces no such string and durable context is never attached to logs. - defaultPythonRuntime: PYTHON_3_13 → PYTHON_3_14 - defaultPythonLayerArn: Datadog-Python313-ARM:123 → Datadog-Python314-ARM:123 - Log CloudWatch log group names in durable test beforeAll for easier debugging Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…gration test LogsAgent::new now returns a 3-tuple; update the integration test to match. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ering For durable functions, response.$metadata.requestId (the HTTP orchestration request ID) differs from the Lambda execution request ID stored as @lambda.request_id in Datadog logs. Durable functions also do not support LogType=Tail, so LogResult is absent. Extract the real execution request ID from tail logs (START RequestId: ...) when available. When LogResult is absent (durable functions), pass undefined to getLogs so it falls back to a service-only query, returning all logs for that function invocation without a request_id filter. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…f directly to logs agent Removes the direct trace_agent → logs_agent dependency by having the lifecycle processor (Processor) own durable_context_tx and forward durable execution context to the logs pipeline from add_tracer_span(). The trace agent now sends aws.lambda spans to the lifecycle processor (alongside INVOCATION_SPAN_RESOURCE spans) and the lifecycle processor handles the durable context extraction and forwarding. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… collapsible_if Bumps quinn-proto from 0.11.13 to 0.11.14 to address a DoS vulnerability (panic on invalid QUIC transport parameters). Collapses nested if-let in add_tracer_span() into a single let-chain to satisfy clippy::collapsible_if. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…e-python Clarifies the intent of the non-durable Lambda alongside the lambda_durable_function.py in the same directory. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Merge log-message, request_id, and no-durable-context assertions into one test per section (single log lookup) - Merge START/END platform log tests into a single loop over both markers - Merge the two cold-start durable_execution_id/name tests with the "held and released" test (all three checked the same log with the same assertions) Net result: 11 tests → 7 tests with no loss of coverage. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove warm-start and concurrent-invocation tests. A single invocation is sufficient to verify durable execution context on function, platform, and extension logs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
bottlecap/src/traces/trace_agent.rs
Outdated
| } | ||
|
|
||
| if span.resource == INVOCATION_SPAN_RESOURCE | ||
| if (span.resource == INVOCATION_SPAN_RESOURCE || span.name == "aws.lambda") |
There was a problem hiding this comment.
This looks kinda dangerous, why are we doing this for?
There was a problem hiding this comment.
You are absolutely right to question this! I agree it would be better to add a separate ProcessorCommand to pass durable context instead of reusing an existing one. Let's talk in the upcoming meeting.
Instead of routing aws.lambda spans through add_tracer_span(), add a separate ForwardDurableContext command so the trace agent explicitly sends (request_id, execution_id, execution_name) to the lifecycle processor, which relays them to the logs pipeline. - add ProcessorCommand::ForwardDurableContext and the corresponding handle method and run() arm in processor_service.rs - split Processor::add_tracer_span() back to context-buffer only; add Processor::forward_durable_context() for the logs relay - in trace_agent handle_traces(), keep the INVOCATION_SPAN_RESOURCE branch unchanged and add a separate aws.lambda branch that calls forward_durable_context when durable metadata is present Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Wrap insert_to_durable_map + take_ready_logs + insert_batch into a single method, using tuple destructuring in the parameter to unpack DurableContextUpdate. Simplifies the agent.rs select branch to a single call. Also adds a TODO to rename process() to process_telemetry_event(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ntext structs
Replace anonymous tuple types with named structs to avoid field-order
mistakes:
- DurableContextUpdate { request_id, execution_id, execution_name }
replaces (String, String, String); defined in logs::lambda and
imported everywhere (lifecycle processor, main, agent)
- DurableExecutionContext { execution_id, execution_name } replaces
(String, String) as the durable_context_map value in LambdaProcessor
Both structs derive Clone, allowing .cloned() on map lookups instead of
the verbose .map(|ctx| (ctx.execution_id.clone(), ctx.execution_name.clone())).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| } | ||
| } | ||
|
|
||
| // TODO: rename this method to process_telemetry_event() |
There was a problem hiding this comment.
Will rename it to process_telemetry_event() to be parallel to process_durable_context_update(). Not in this PR because this PR is already very big.
…ython to 3.14 layer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

Summary
If the function is a durable function, then add two attributes to every log:
lambda.durable_execution_idlambda.durable_execution_nameBackground
durable_function_execution_idanddurable_function_execution_nameto theaws.lambdaspanDetails
Data flow
TraceAgent::handle_traces()detects anaws.lambdaspan carryingrequest_id,durable_function_execution_id, anddurable_function_execution_namein its meta tagsProcessorCommand::ForwardDurableContext { request_id, execution_id, execution_name }toInvocationProcessorServiceProcessor::forward_durable_context()in the lifecycle processor relays this as aDurableContextUpdateto the logs pipeline via an mpsc channelLogsAgent::spin()receives the update and callsLogsProcessor::process_durable_context_update(), which inserts the entry intoLambdaProcessor::durable_context_mapand drains any held logs for thatrequest_idLog holding and draining
held_logs: HashMap<String, Vec<IntakeLog>>, keyed byrequest_iddurable_context_map: HashMap<String, DurableExecutionContext>mapsrequest_idto(execution_id, execution_name). It has a fixed capacity (500 entries) with FIFO eviction.PlatformInitStartevent, it learns whether the function is a durable function:request_idis already indurable_context_map(tag them withlambda.durable_execution_idandlambda.durable_execution_name); keep the rest held until their context arrivesdurable_context_map, any held logs for thatrequest_idare drained immediatelyTypes
DurableContextUpdate { request_id, execution_id, execution_name }— message sent from trace agent through lifecycle processor to logs pipelineDurableExecutionContext { execution_id, execution_name }— value type stored indurable_context_mapTest plan
Manual test
Steps
Build a layer, install it on a function, and invoke it.
Result
returns all the logs for two invocations of this durable execution. It returns 98 logs, equal to 49 logs for the first invocation + 49 logs for the second invocation. (query link)
Integration test
Passed the added integration test
Unit tests
Passed the added unit tests