Skip to content

OTel: Task Processor #184

@khvn26

Description

@khvn26

When a Django view enqueues a task, the OTel trace context (TraceID, SpanID, baggage) is available. By the time the task processor picks it up, that context is gone. This means:

  • Task spans can't be linked to the original request trace
  • structlog events emitted during task execution have empty TraceID/SpanID
  • W3C baggage (e.g. amplitude.device_id, amplitude.session_id) is lost

We want to propagate OTel trace context through task serialisation using OTel's propagate.inject / propagate.extract API.

Add a trace_context JSONField (nullable, default empty dict) to AbstractBaseTask. Requires a migration.

When creating a Task, inject the current OTel context into a carrier dict and store it alongside the task:

from opentelemetry import propagate

carrier: dict[str, str] = {}
propagate.inject(carrier)
# Store carrier in task.trace_context field

In _run_task(), extract the context and activate it before calling the task function:

from opentelemetry import context, propagate, trace

ctx = propagate.extract(task.trace_context)
tracer = trace.get_tracer("task_processor")
with tracer.start_as_current_span(task.task_identifier, context=ctx):
    task.run()

The task processor should identify itself as flagsmith-task-processor in OTel resources. In ensure_cli_env(), derive the default service.name from the entrypoint: flagsmith-task-processor when task-processor is in sys.argv, flagsmith-api otherwise. OTEL_SERVICE_NAME env var overrides both.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions