Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions airflow-core/src/airflow/example_dags/example_failed_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating task failure to generate exception traces in logs."""

from __future__ import annotations

import pendulum

from airflow.sdk import DAG, task


@task
def fail_task():
"""A task that always fails to generate error_detail."""
raise RuntimeError("This is a test exception for stacktrace rendering")


with DAG(
"example_failed_dag",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
fail_task()
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,45 @@ def test_remote_logging_elasticsearch(self):
assert any(self.expected_message in event for event in events), (
f"Expected task logs to contain {self.expected_message!r}, got events: {events}"
)

def test_remote_logging_elasticsearch_error_detail(self):
"""Test that log error_detail is retrieved correctly from Elasticsearch."""
dag_id = "example_failed_dag"
task_id = "fail_task"

self.airflow_client.un_pause_dag(dag_id)
resp = self.airflow_client.trigger_dag(
dag_id,
json={"logical_date": datetime.now(timezone.utc).isoformat()},
)
run_id = resp["dag_run_id"]
state = self.airflow_client.wait_for_dag_run(dag_id=dag_id, run_id=run_id)

assert state == "failed"

# Logs might take some time to appear in ES
task_logs_content = []
for _ in range(self.max_retries):
task_logs_resp = self.airflow_client.get_task_logs(
dag_id=dag_id,
task_id=task_id,
run_id=run_id,
)
task_logs_content = task_logs_resp.get("content", [])
# Search for the log entry with error_detail
if any("error_detail" in item for item in task_logs_content if isinstance(item, dict)):
break
time.sleep(self.retry_interval_in_seconds)

error_entries = [
item for item in task_logs_content if isinstance(item, dict) and "error_detail" in item
]
assert len(error_entries) > 0, (
f"Expected error_detail in logs, but none found. Logs: {task_logs_content}"
)

error_detail = error_entries[0]["error_detail"]
assert isinstance(error_detail, list), f"Expected error_detail to be a list, got {type(error_detail)}"
assert len(error_detail) > 0, "Expected error_detail to have at least one exception"
assert error_detail[0]["exc_type"] == "RuntimeError"
assert "This is a test exception for stacktrace rendering" in error_detail[0]["exc_value"]
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,56 @@
# not exist, the task handler should use the log_id_template attribute instead.
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")

TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", "error_detail", "message", "levelname"]


def _format_error_detail(error_detail: Any) -> str | None:
"""Render the structured ``error_detail`` written by the Airflow 3 supervisor as a traceback string."""
if not error_detail:
return None
if not isinstance(error_detail, list):
return str(error_detail)

lines: list[str] = ["Traceback (most recent call last):"]
for exc_info in error_detail:
if not isinstance(exc_info, dict):
lines.append(str(exc_info))
continue
if exc_info.get("is_cause"):
lines.append("\nThe above exception was the direct cause of the following exception:\n")
lines.append("Traceback (most recent call last):")
for frame in exc_info.get("frames", []):
lines.append(
f' File "{frame.get("filename", "<unknown>")}", line {frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
)
exc_type = exc_info.get("exc_type", "")
exc_value = exc_info.get("exc_value", "")
if exc_type:
lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
return "\n".join(lines)


def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
"""Filter an ES hit to ``TASK_LOG_FIELDS`` and ensure compatibility with StructuredLogMessage."""
fields = {k: v for k, v in hit_dict.items() if k.lower() in TASK_LOG_FIELDS or k == "@timestamp"}

# Map @timestamp to timestamp
if "@timestamp" in fields and "timestamp" not in fields:
fields["timestamp"] = fields.pop("@timestamp")

# Map levelname to level
if "levelname" in fields and "level" not in fields:
fields["level"] = fields.pop("levelname")

# Airflow 3 StructuredLogMessage requires 'event'
if "event" not in fields:
fields["event"] = fields.pop("message", "")

# Clean up error_detail if it's empty
if "error_detail" in fields and not fields["error_detail"]:
fields.pop("error_detail")
return fields


VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
# Remove `self` from the valid set of kwargs
Expand Down Expand Up @@ -356,9 +405,7 @@ def _read(

# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
message = header + [
StructuredLogMessage(
**{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
)
StructuredLogMessage(**_build_log_fields(hit.to_dict()))
for hits in logs_by_host.values()
for hit in hits
]
Expand Down Expand Up @@ -668,7 +715,7 @@ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMe
# Structured log messages
for hits in logs_by_host.values():
for hit in hits:
filtered = {k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
filtered = _build_log_fields(hit.to_dict())
message.append(json.dumps(filtered))

return header, message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import elasticsearch
import pytest

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO, _render_log_id

# The ES service hostname as defined in scripts/ci/docker-compose/integration-elasticsearch.yml
ES_HOST = "http://elasticsearch:9200"
Expand Down Expand Up @@ -101,8 +101,8 @@ def test_upload_and_read(self, tmp_json_log_file, ti):
expected_messages = ["start", "processing", "end"]
for expected, log_message in zip(expected_messages, log_messages):
log_entry = json.loads(log_message)
assert "message" in log_entry
assert log_entry["message"] == expected
assert "event" in log_entry
assert log_entry["event"] == expected

def test_read_missing_log(self, ti):
"""Verify that a missing log returns the expected error message.
Expand All @@ -118,3 +118,30 @@ def test_read_missing_log(self, ti):
assert log_source_info == []
assert len(log_messages) == 1
assert "not found in Elasticsearch" in log_messages[0]

def test_read_error_detail_integration(self, ti):
"""Verify that error_detail is correctly retrieved and formatted in integration tests."""
# Manually index a log entry with error_detail
error_detail = [
{
"is_cause": False,
"frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"}],
"exc_type": "RuntimeError",
"exc_value": "Woopsie. Something went wrong.",
}
]
body = {
"event": "Task failed with exception",
"log_id": _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number),
"offset": 1,
"error_detail": error_detail,
}
self.elasticsearch_io.client.index(index=self.target_index, body=body)
self.elasticsearch_io.client.indices.refresh(index=self.target_index)

log_source_info, log_messages = self.elasticsearch_io.read("", ti)

assert len(log_messages) == 1
log_entry = json.loads(log_messages[0])
assert "error_detail" in log_entry
assert log_entry["error_detail"] == error_detail
Loading
Loading