Skip to content

Commit e47ccdb

Browse files
committed
Fix missing traceback in Elasticsearch and OpenSearch task logs
closes: #63736
1 parent b87ed1d commit e47ccdb

5 files changed

Lines changed: 456 additions & 9 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix missing traceback in Elasticsearch and OpenSearch task logs

providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,46 @@
7979
# not exist, the task handler should use the log_id_template attribute instead.
8080
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
8181

82-
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
82+
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", "error_detail"]
83+
84+
85+
def _format_error_detail(error_detail: Any) -> str | None:
86+
"""Render the structured ``error_detail`` written by the Airflow 3 supervisor as a traceback string."""
87+
if not error_detail:
88+
return None
89+
if not isinstance(error_detail, list):
90+
return str(error_detail)
91+
92+
lines: list[str] = ["Traceback (most recent call last):"]
93+
for exc_info in error_detail:
94+
if not isinstance(exc_info, dict):
95+
lines.append(str(exc_info))
96+
continue
97+
if exc_info.get("is_cause"):
98+
lines.append("\nThe above exception was the direct cause of the following exception:\n")
99+
lines.append("Traceback (most recent call last):")
100+
for frame in exc_info.get("frames", []):
101+
lines.append(
102+
f' File "{frame.get("filename", "<unknown>")}", line {frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
103+
)
104+
exc_type = exc_info.get("exc_type", "")
105+
exc_value = exc_info.get("exc_value", "")
106+
if exc_type:
107+
lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
108+
return "\n".join(lines)
109+
110+
111+
def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
112+
"""Filter an ES hit to ``TASK_LOG_FIELDS``, formatting ``error_detail`` as a plain-text traceback."""
113+
fields = {k: v for k, v in hit_dict.items() if k.lower() in TASK_LOG_FIELDS}
114+
if "error_detail" in fields:
115+
formatted = _format_error_detail(fields["error_detail"])
116+
if formatted:
117+
fields["error_detail"] = formatted
118+
else:
119+
fields.pop("error_detail")
120+
return fields
121+
83122

84123
VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
85124
# Remove `self` from the valid set of kwargs
@@ -356,9 +395,7 @@ def _read(
356395

357396
# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
358397
message = header + [
359-
StructuredLogMessage(
360-
**{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
361-
)
398+
StructuredLogMessage(**_build_log_fields(hit.to_dict()))
362399
for hits in logs_by_host.values()
363400
for hit in hits
364401
]
@@ -668,7 +705,7 @@ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMe
668705
# Structured log messages
669706
for hits in logs_by_host.values():
670707
for hit in hits:
671-
filtered = {k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
708+
filtered = _build_log_fields(hit.to_dict())
672709
message.append(json.dumps(filtered))
673710

674711
return header, message

providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,3 +1060,190 @@ def test_read_with_missing_log(self, mocked_count, ti):
10601060
assert log_source_info == []
10611061
assert f"*** Log {log_id} not found in Elasticsearch" in log_messages[0]
10621062
mocked_count.assert_called_once()
1063+
1064+
1065+
# ---------------------------------------------------------------------------
1066+
# Tests for the error_detail helpers (issue #63736)
1067+
# ---------------------------------------------------------------------------
1068+
1069+
1070+
class TestFormatErrorDetail:
1071+
"""Unit tests for _format_error_detail."""
1072+
1073+
def test_returns_none_for_empty(self):
1074+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1075+
1076+
assert _format_error_detail(None) is None
1077+
assert _format_error_detail([]) is None
1078+
1079+
def test_returns_string_for_non_list(self):
1080+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1081+
1082+
assert _format_error_detail("raw string") == "raw string"
1083+
1084+
def test_formats_single_exception(self):
1085+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1086+
1087+
error_detail = [
1088+
{
1089+
"is_cause": False,
1090+
"frames": [
1091+
{"filename": "/app/task.py", "lineno": 13, "name": "log_and_raise"},
1092+
],
1093+
"exc_type": "RuntimeError",
1094+
"exc_value": "Something went wrong.",
1095+
"exceptions": [],
1096+
"is_group": False,
1097+
}
1098+
]
1099+
result = _format_error_detail(error_detail)
1100+
assert result is not None
1101+
assert "Traceback (most recent call last):" in result
1102+
assert 'File "/app/task.py", line 13, in log_and_raise' in result
1103+
assert "RuntimeError: Something went wrong." in result
1104+
1105+
def test_formats_chained_exceptions(self):
1106+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1107+
1108+
error_detail = [
1109+
{
1110+
"is_cause": True,
1111+
"frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
1112+
"exc_type": "ValueError",
1113+
"exc_value": "original",
1114+
"exceptions": [],
1115+
},
1116+
{
1117+
"is_cause": False,
1118+
"frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
1119+
"exc_type": "RuntimeError",
1120+
"exc_value": "wrapped",
1121+
"exceptions": [],
1122+
},
1123+
]
1124+
result = _format_error_detail(error_detail)
1125+
assert result is not None
1126+
assert "direct cause" in result
1127+
assert "ValueError: original" in result
1128+
assert "RuntimeError: wrapped" in result
1129+
1130+
def test_exc_type_without_value(self):
1131+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1132+
1133+
error_detail = [
1134+
{
1135+
"is_cause": False,
1136+
"frames": [],
1137+
"exc_type": "StopIteration",
1138+
"exc_value": "",
1139+
}
1140+
]
1141+
result = _format_error_detail(error_detail)
1142+
assert result is not None
1143+
assert result.endswith("StopIteration")
1144+
1145+
def test_non_dict_items_are_stringified(self):
1146+
from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail
1147+
1148+
result = _format_error_detail(["unexpected string item"])
1149+
assert result is not None
1150+
assert "unexpected string item" in result
1151+
1152+
1153+
class TestBuildStructuredLogFields:
1154+
"""Unit tests for _build_log_fields."""
1155+
1156+
def test_filters_to_allowed_fields(self):
1157+
from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields
1158+
1159+
hit = {"event": "hello", "level": "info", "unknown_field": "should be dropped"}
1160+
result = _build_log_fields(hit)
1161+
assert "event" in result
1162+
assert "level" in result
1163+
assert "unknown_field" not in result
1164+
1165+
def test_error_detail_formatted_as_string(self):
1166+
from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields
1167+
1168+
hit = {
1169+
"event": "Task failed with exception",
1170+
"error_detail": [
1171+
{
1172+
"is_cause": False,
1173+
"frames": [{"filename": "/dag.py", "lineno": 10, "name": "run"}],
1174+
"exc_type": "RuntimeError",
1175+
"exc_value": "Woopsie.",
1176+
}
1177+
],
1178+
}
1179+
result = _build_log_fields(hit)
1180+
assert isinstance(result["error_detail"], str)
1181+
assert "RuntimeError: Woopsie." in result["error_detail"]
1182+
assert 'File "/dag.py", line 10, in run' in result["error_detail"]
1183+
1184+
def test_error_detail_dropped_when_empty(self):
1185+
from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields
1186+
1187+
hit = {"event": "msg", "error_detail": []}
1188+
result = _build_log_fields(hit)
1189+
assert "error_detail" not in result
1190+
1191+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage only exists in Airflow 3+")
1192+
@elasticmock
1193+
def test_read_includes_error_detail_in_structured_message(self):
1194+
"""End-to-end: a hit with error_detail should surface it in the returned StructuredLogMessage."""
1195+
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
1196+
1197+
local_log_location = "local/log/location"
1198+
handler = ElasticsearchTaskHandler(
1199+
base_log_folder=local_log_location,
1200+
end_of_log_mark="end_of_log\n",
1201+
write_stdout=False,
1202+
json_format=False,
1203+
json_fields="asctime,filename,lineno,levelname,message,exc_text",
1204+
)
1205+
1206+
es = elasticsearch.Elasticsearch("http://localhost:9200")
1207+
log_id = "test_dag-test_task-test_run--1-1"
1208+
body = {
1209+
"event": "Task failed with exception",
1210+
"log_id": log_id,
1211+
"offset": 1,
1212+
"error_detail": [
1213+
{
1214+
"is_cause": False,
1215+
"frames": [
1216+
{"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"}
1217+
],
1218+
"exc_type": "RuntimeError",
1219+
"exc_value": "Woopsie. Something went wrong.",
1220+
}
1221+
],
1222+
}
1223+
es.index(index="test_index", doc_type="log", body=body, id=1)
1224+
1225+
# Patch the IO layer to return our fake document
1226+
mock_hit_dict = body.copy()
1227+
1228+
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
1229+
1230+
mock_hit = Hit({"_source": mock_hit_dict})
1231+
mock_response = mock.MagicMock(spec=ElasticSearchResponse)
1232+
mock_response.hits = [mock_hit]
1233+
mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
1234+
mock_response.__bool__ = mock.Mock(return_value=True)
1235+
mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
1236+
1237+
with mock.patch.object(handler.io, "_es_read", return_value=mock_response):
1238+
with mock.patch.object(handler.io, "_group_logs_by_host", return_value={"localhost": [mock_hit]}):
1239+
# Build StructuredLogMessages
1240+
from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields
1241+
from airflow.utils.log.file_task_handler import StructuredLogMessage
1242+
1243+
fields = _build_log_fields(mock_hit.to_dict())
1244+
msg = StructuredLogMessage(**fields)
1245+
1246+
assert msg.event == "Task failed with exception"
1247+
assert hasattr(msg, "error_detail")
1248+
assert "RuntimeError: Woopsie. Something went wrong." in msg.error_detail
1249+
assert 'File "/opt/airflow/dags/fail.py", line 13, in log_and_raise' in msg.error_detail

providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,45 @@
5858

5959
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
6060
LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
61-
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
61+
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", "error_detail"]
62+
63+
64+
def _format_error_detail(error_detail: Any) -> str | None:
65+
"""Render the structured ``error_detail`` written by the Airflow 3 supervisor as a traceback string."""
66+
if not error_detail:
67+
return None
68+
if not isinstance(error_detail, list):
69+
return str(error_detail)
70+
71+
lines: list[str] = ["Traceback (most recent call last):"]
72+
for exc_info in error_detail:
73+
if not isinstance(exc_info, dict):
74+
lines.append(str(exc_info))
75+
continue
76+
if exc_info.get("is_cause"):
77+
lines.append("\nThe above exception was the direct cause of the following exception:\n")
78+
lines.append("Traceback (most recent call last):")
79+
for frame in exc_info.get("frames", []):
80+
lines.append(
81+
f' File "{frame.get("filename", "<unknown>")}", line {frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
82+
)
83+
exc_type = exc_info.get("exc_type", "")
84+
exc_value = exc_info.get("exc_value", "")
85+
if exc_type:
86+
lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
87+
return "\n".join(lines)
88+
89+
90+
def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
91+
"""Filter an OpenSearch hit to ``TASK_LOG_FIELDS``, formatting ``error_detail`` as a plain-text traceback."""
92+
fields = {k: v for k, v in hit_dict.items() if k.lower() in TASK_LOG_FIELDS}
93+
if "error_detail" in fields:
94+
formatted = _format_error_detail(fields["error_detail"])
95+
if formatted:
96+
fields["error_detail"] = formatted
97+
else:
98+
fields.pop("error_detail")
99+
return fields
62100

63101

64102
def getattr_nested(obj, item, default):
@@ -416,9 +454,7 @@ def concat_logs(hits: list[Hit]):
416454

417455
# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
418456
message = header + [
419-
StructuredLogMessage(
420-
**{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
421-
)
457+
StructuredLogMessage(**_build_log_fields(hit.to_dict()))
422458
for hits in logs_by_host.values()
423459
for hit in hits
424460
]

0 commit comments

Comments
 (0)