diff --git a/airflow-core/src/airflow/example_dags/example_failed_dag.py b/airflow-core/src/airflow/example_dags/example_failed_dag.py new file mode 100644 index 0000000000000..5b8cae60f271f --- /dev/null +++ b/airflow-core/src/airflow/example_dags/example_failed_dag.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAG demonstrating task failure to generate exception traces in logs.""" + +from __future__ import annotations + +import pendulum + +from airflow.sdk import DAG, task + + +@task +def fail_task(): + """A task that always fails to generate error_detail.""" + raise RuntimeError("This is a test exception for stacktrace rendering") + + +with DAG( + "example_failed_dag", + schedule="@once", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], +) as dag: + fail_task() diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py index 5c07a04203a63..c17f5109f802f 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py @@ -101,3 +101,45 @@ def test_remote_logging_elasticsearch(self): assert any(self.expected_message in event for event in events), ( f"Expected task logs to contain {self.expected_message!r}, got events: {events}" ) + + def test_remote_logging_elasticsearch_error_detail(self): + """Test that log error_detail is retrieved correctly from Elasticsearch.""" + dag_id = "example_failed_dag" + task_id = "fail_task" + + self.airflow_client.un_pause_dag(dag_id) + resp = self.airflow_client.trigger_dag( + dag_id, + json={"logical_date": datetime.now(timezone.utc).isoformat()}, + ) + run_id = resp["dag_run_id"] + state = self.airflow_client.wait_for_dag_run(dag_id=dag_id, run_id=run_id) + + assert state == "failed" + + # Logs might take some time to appear in ES + task_logs_content = [] + for _ in range(self.max_retries): + task_logs_resp = self.airflow_client.get_task_logs( + dag_id=dag_id, + task_id=task_id, + run_id=run_id, + ) + task_logs_content = task_logs_resp.get("content", []) + # Search for the log entry with error_detail + if any("error_detail" in item for item in task_logs_content if isinstance(item, dict)): + break + time.sleep(self.retry_interval_in_seconds) + + error_entries = [ + item for item in task_logs_content if isinstance(item, dict) and "error_detail" in item + ] + assert len(error_entries) > 0, ( + f"Expected error_detail in logs, but none found. Logs: {task_logs_content}" + ) + + error_detail = error_entries[0]["error_detail"] + assert isinstance(error_detail, list), f"Expected error_detail to be a list, got {type(error_detail)}" + assert len(error_detail) > 0, "Expected error_detail to have at least one exception" + assert error_detail[0]["exc_type"] == "RuntimeError" + assert "This is a test exception for stacktrace rendering" in error_detail[0]["exc_value"] diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 60080658ae91d..f063749e523bc 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -79,7 +79,56 @@ # not exist, the task handler should use the log_id_template attribute instead. USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") -TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"] +TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", "error_detail", "message", "levelname"] + + +def _format_error_detail(error_detail: Any) -> str | None: + """Render the structured ``error_detail`` written by the Airflow 3 supervisor as a traceback string.""" + if not error_detail: + return None + if not isinstance(error_detail, list): + return str(error_detail) + + lines: list[str] = ["Traceback (most recent call last):"] + for exc_info in error_detail: + if not isinstance(exc_info, dict): + lines.append(str(exc_info)) + continue + if exc_info.get("is_cause"): + lines.append("\nThe above exception was the direct cause of the following exception:\n") + lines.append("Traceback (most recent call last):") + for frame in exc_info.get("frames", []): + lines.append( + f' File "{frame.get("filename", "")}", line {frame.get("lineno", "?")}, in {frame.get("name", "")}' + ) + exc_type = exc_info.get("exc_type", "") + exc_value = exc_info.get("exc_value", "") + if exc_type: + lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type) + return "\n".join(lines) + + +def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]: + """Filter an ES hit to ``TASK_LOG_FIELDS`` and ensure compatibility with StructuredLogMessage.""" + fields = {k: v for k, v in hit_dict.items() if k.lower() in TASK_LOG_FIELDS or k == "@timestamp"} + + # Map @timestamp to timestamp + if "@timestamp" in fields and "timestamp" not in fields: + fields["timestamp"] = fields.pop("@timestamp") + + # Map levelname to level + if "levelname" in fields and "level" not in fields: + fields["level"] = fields.pop("levelname") + + # Airflow 3 StructuredLogMessage requires 'event' + if "event" not in fields: + fields["event"] = fields.pop("message", "") + + # Clean up error_detail if it's empty + if "error_detail" in fields and not fields["error_detail"]: + fields.pop("error_detail") + return fields + VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys()) # Remove `self` from the valid set of kwargs @@ -356,9 +405,7 @@ def _read( # Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects message = header + [ - StructuredLogMessage( - **{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS} - ) + StructuredLogMessage(**_build_log_fields(hit.to_dict())) for hits in logs_by_host.values() for hit in hits ] @@ -668,7 +715,7 @@ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMe # Structured log messages for hits in logs_by_host.values(): for hit in hits: - filtered = {k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS} + filtered = _build_log_fields(hit.to_dict()) message.append(json.dumps(filtered)) return header, message diff --git a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py index aa8e91f812d42..89a12a3a39ba2 100644 --- a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py +++ b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py @@ -24,7 +24,7 @@ import elasticsearch import pytest -from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO +from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO, _render_log_id # The ES service hostname as defined in scripts/ci/docker-compose/integration-elasticsearch.yml ES_HOST = "http://elasticsearch:9200" @@ -101,8 +101,8 @@ def test_upload_and_read(self, tmp_json_log_file, ti): expected_messages = ["start", "processing", "end"] for expected, log_message in zip(expected_messages, log_messages): log_entry = json.loads(log_message) - assert "message" in log_entry - assert log_entry["message"] == expected + assert "event" in log_entry + assert log_entry["event"] == expected def test_read_missing_log(self, ti): """Verify that a missing log returns the expected error message. @@ -118,3 +118,30 @@ def test_read_missing_log(self, ti): assert log_source_info == [] assert len(log_messages) == 1 assert "not found in Elasticsearch" in log_messages[0] + + def test_read_error_detail_integration(self, ti): + """Verify that error_detail is correctly retrieved and formatted in integration tests.""" + # Manually index a log entry with error_detail + error_detail = [ + { + "is_cause": False, + "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"}], + "exc_type": "RuntimeError", + "exc_value": "Woopsie. Something went wrong.", + } + ] + body = { + "event": "Task failed with exception", + "log_id": _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number), + "offset": 1, + "error_detail": error_detail, + } + self.elasticsearch_io.client.index(index=self.target_index, body=body) + self.elasticsearch_io.client.indices.refresh(index=self.target_index) + + log_source_info, log_messages = self.elasticsearch_io.read("", ti) + + assert len(log_messages) == 1 + log_entry = json.loads(log_messages[0]) + assert "error_detail" in log_entry + assert log_entry["error_detail"] == error_detail diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 6698f27efa37a..8d6a0ba616ccc 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -1060,3 +1060,259 @@ def test_read_with_missing_log(self, mocked_count, ti): assert log_source_info == [] assert f"*** Log {log_id} not found in Elasticsearch" in log_messages[0] mocked_count.assert_called_once() + + def test_read_error_detail(self, ti): + """Verify that error_detail is correctly retrieved and formatted.""" + error_detail = [ + { + "is_cause": False, + "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"}], + "exc_type": "RuntimeError", + "exc_value": "Woopsie. Something went wrong.", + } + ] + body = { + "event": "Task failed with exception", + "log_id": _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number), + "offset": 1, + "error_detail": error_detail, + } + + from airflow.providers.elasticsearch.log.es_response import Hit + + mock_hit = Hit({"_source": body}) + with ( + patch.object(self.elasticsearch_io, "_es_read") as mock_es_read, + patch.object( + self.elasticsearch_io, + "_group_logs_by_host", + return_value={"http://localhost:9200": [mock_hit]}, + ), + ): + mock_es_read.return_value = mock.MagicMock() + mock_es_read.return_value.hits = [mock_hit] + + log_source_info, log_messages = self.elasticsearch_io.read("", ti) + + assert len(log_messages) == 1 + log_entry = json.loads(log_messages[0]) + assert "error_detail" in log_entry + assert log_entry["error_detail"] == error_detail + + +# --------------------------------------------------------------------------- +# Tests for the error_detail helpers (issue #63736) +# --------------------------------------------------------------------------- + + +class TestFormatErrorDetail: + """Unit tests for _format_error_detail.""" + + def test_returns_none_for_empty(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + assert _format_error_detail(None) is None + assert _format_error_detail([]) is None + + def test_returns_string_for_non_list(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + assert _format_error_detail("raw string") == "raw string" + + def test_formats_single_exception(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": False, + "frames": [ + {"filename": "/app/task.py", "lineno": 13, "name": "log_and_raise"}, + ], + "exc_type": "RuntimeError", + "exc_value": "Something went wrong.", + "exceptions": [], + "is_group": False, + } + ] + result = _format_error_detail(error_detail) + assert result is not None + assert "Traceback (most recent call last):" in result + assert 'File "/app/task.py", line 13, in log_and_raise' in result + assert "RuntimeError: Something went wrong." in result + + def test_formats_chained_exceptions(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": True, + "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}], + "exc_type": "ValueError", + "exc_value": "original", + "exceptions": [], + }, + { + "is_cause": False, + "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}], + "exc_type": "RuntimeError", + "exc_value": "wrapped", + "exceptions": [], + }, + ] + result = _format_error_detail(error_detail) + assert result is not None + assert "direct cause" in result + assert "ValueError: original" in result + assert "RuntimeError: wrapped" in result + + def test_exc_type_without_value(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": False, + "frames": [], + "exc_type": "StopIteration", + "exc_value": "", + } + ] + result = _format_error_detail(error_detail) + assert result is not None + assert result.endswith("StopIteration") + + def test_non_dict_items_are_stringified(self): + from airflow.providers.elasticsearch.log.es_task_handler import _format_error_detail + + result = _format_error_detail(["unexpected string item"]) + assert result is not None + assert "unexpected string item" in result + + +class TestBuildStructuredLogFields: + """Unit tests for _build_log_fields.""" + + def test_filters_to_allowed_fields(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"event": "hello", "level": "info", "unknown_field": "should be dropped"} + result = _build_log_fields(hit) + assert "event" in result + assert "level" in result + assert "unknown_field" not in result + + def test_message_mapped_to_event(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"} + fields = _build_log_fields(hit) + assert fields["event"] == "plain message" + assert "message" not in fields # Ensure it is popped if used as event + + def test_message_preserved_if_event_exists(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"event": "structured event", "message": "plain message"} + fields = _build_log_fields(hit) + assert fields["event"] == "structured event" + # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide with event + assert fields["message"] == "plain message" + + def test_levelname_mapped_to_level(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"event": "msg", "levelname": "ERROR"} + result = _build_log_fields(hit) + assert result["level"] == "ERROR" + assert "levelname" not in result + + def test_at_timestamp_mapped_to_timestamp(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"} + result = _build_log_fields(hit) + assert result["timestamp"] == "2024-01-01T00:00:00Z" + assert "@timestamp" not in result + + def test_error_detail_is_kept_as_list(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + error_detail = [ + { + "is_cause": False, + "frames": [{"filename": "/dag.py", "lineno": 10, "name": "run"}], + "exc_type": "RuntimeError", + "exc_value": "Woopsie.", + } + ] + hit = { + "event": "Task failed with exception", + "error_detail": error_detail, + } + result = _build_log_fields(hit) + assert result["error_detail"] == error_detail + + def test_error_detail_dropped_when_empty(self): + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + + hit = {"event": "msg", "error_detail": []} + result = _build_log_fields(hit) + assert "error_detail" not in result + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage only exists in Airflow 3+") + @elasticmock + def test_read_includes_error_detail_in_structured_message(self): + """End-to-end: a hit with error_detail should surface it in the returned StructuredLogMessage.""" + from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler + + local_log_location = "local/log/location" + handler = ElasticsearchTaskHandler( + base_log_folder=local_log_location, + end_of_log_mark="end_of_log\n", + write_stdout=False, + json_format=False, + json_fields="asctime,filename,lineno,levelname,message,exc_text", + ) + + es = elasticsearch.Elasticsearch("http://localhost:9200") + log_id = "test_dag-test_task-test_run--1-1" + body = { + "event": "Task failed with exception", + "log_id": log_id, + "offset": 1, + "error_detail": [ + { + "is_cause": False, + "frames": [ + {"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"} + ], + "exc_type": "RuntimeError", + "exc_value": "Woopsie. Something went wrong.", + } + ], + } + es.index(index="test_index", doc_type="log", body=body, id=1) + + # Patch the IO layer to return our fake document + mock_hit_dict = body.copy() + + from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit + + mock_hit = Hit({"_source": mock_hit_dict}) + mock_response = mock.MagicMock(spec=ElasticSearchResponse) + mock_response.hits = [mock_hit] + mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit])) + mock_response.__bool__ = mock.Mock(return_value=True) + mock_response.__getitem__ = mock.Mock(return_value=mock_hit) + + with mock.patch.object(handler.io, "_es_read", return_value=mock_response): + with mock.patch.object(handler.io, "_group_logs_by_host", return_value={"localhost": [mock_hit]}): + # Build StructuredLogMessages + from airflow.providers.elasticsearch.log.es_task_handler import _build_log_fields + from airflow.utils.log.file_task_handler import StructuredLogMessage + + fields = _build_log_fields(mock_hit.to_dict()) + msg = StructuredLogMessage(**fields) + + assert msg.event == "Task failed with exception" + assert hasattr(msg, "error_detail") + assert msg.error_detail == body["error_detail"] diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index c76980e51066b..05f0ff90cbf77 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -58,7 +58,55 @@ USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} -TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"] +TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", "error_detail", "message", "levelname"] + + +def _format_error_detail(error_detail: Any) -> str | None: + """Render the structured ``error_detail`` written by the Airflow 3 supervisor as a traceback string.""" + if not error_detail: + return None + if not isinstance(error_detail, list): + return str(error_detail) + + lines: list[str] = ["Traceback (most recent call last):"] + for exc_info in error_detail: + if not isinstance(exc_info, dict): + lines.append(str(exc_info)) + continue + if exc_info.get("is_cause"): + lines.append("\nThe above exception was the direct cause of the following exception:\n") + lines.append("Traceback (most recent call last):") + for frame in exc_info.get("frames", []): + lines.append( + f' File "{frame.get("filename", "")}", line {frame.get("lineno", "?")}, in {frame.get("name", "")}' + ) + exc_type = exc_info.get("exc_type", "") + exc_value = exc_info.get("exc_value", "") + if exc_type: + lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type) + return "\n".join(lines) + + +def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]: + """Filter an OpenSearch hit to ``TASK_LOG_FIELDS`` and ensure compatibility with StructuredLogMessage.""" + fields = {k: v for k, v in hit_dict.items() if k.lower() in TASK_LOG_FIELDS or k == "@timestamp"} + + # Map @timestamp to timestamp + if "@timestamp" in fields and "timestamp" not in fields: + fields["timestamp"] = fields.pop("@timestamp") + + # Map levelname to level + if "levelname" in fields and "level" not in fields: + fields["level"] = fields.pop("levelname") + + # Airflow 3 StructuredLogMessage requires 'event' + if "event" not in fields: + fields["event"] = fields.pop("message", "") + + # Clean up error_detail if it's empty + if "error_detail" in fields and not fields["error_detail"]: + fields.pop("error_detail") + return fields def getattr_nested(obj, item, default): @@ -416,9 +464,7 @@ def concat_logs(hits: list[Hit]): # Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects message = header + [ - StructuredLogMessage( - **{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS} - ) + StructuredLogMessage(**_build_log_fields(hit.to_dict())) for hits in logs_by_host.values() for hit in hits ] diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 4dc46c1d89b22..15aba25ae8bce 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -568,3 +568,220 @@ def test_retrieve_config_keys(): # http_compress comes from config value assert "http_compress" in args_from_config assert "self" not in args_from_config + + +# --------------------------------------------------------------------------- +# Tests for the error_detail helpers (issue #63736) +# --------------------------------------------------------------------------- + + +class TestFormatErrorDetail: + """Unit tests for _format_error_detail.""" + + def test_returns_none_for_empty(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + assert _format_error_detail(None) is None + assert _format_error_detail([]) is None + + def test_returns_string_for_non_list(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + assert _format_error_detail("raw string") == "raw string" + + def test_formats_single_exception(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": False, + "frames": [ + {"filename": "/app/task.py", "lineno": 13, "name": "log_and_raise"}, + ], + "exc_type": "RuntimeError", + "exc_value": "Something went wrong.", + "exceptions": [], + "is_group": False, + } + ] + result = _format_error_detail(error_detail) + assert result is not None + assert "Traceback (most recent call last):" in result + assert 'File "/app/task.py", line 13, in log_and_raise' in result + assert "RuntimeError: Something went wrong." in result + + def test_formats_chained_exceptions(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": True, + "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}], + "exc_type": "ValueError", + "exc_value": "original", + "exceptions": [], + }, + { + "is_cause": False, + "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}], + "exc_type": "RuntimeError", + "exc_value": "wrapped", + "exceptions": [], + }, + ] + result = _format_error_detail(error_detail) + assert result is not None + assert "direct cause" in result + assert "ValueError: original" in result + assert "RuntimeError: wrapped" in result + + def test_exc_type_without_value(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + error_detail = [ + { + "is_cause": False, + "frames": [], + "exc_type": "StopIteration", + "exc_value": "", + } + ] + result = _format_error_detail(error_detail) + assert result is not None + assert result.endswith("StopIteration") + + def test_non_dict_items_are_stringified(self): + from airflow.providers.opensearch.log.os_task_handler import _format_error_detail + + result = _format_error_detail(["unexpected string item"]) + assert result is not None + assert "unexpected string item" in result + + +class TestBuildLogFields: + """Unit tests for _build_log_fields.""" + + def test_filters_to_allowed_fields(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"event": "hello", "level": "info", "unknown_field": "should be dropped"} + result = _build_log_fields(hit) + assert "event" in result + assert "level" in result + assert "unknown_field" not in result + + def test_message_mapped_to_event(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"} + fields = _build_log_fields(hit) + assert fields["event"] == "plain message" + assert "message" not in fields # Ensure it is popped if used as event + + def test_message_preserved_if_event_exists(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"event": "structured event", "message": "plain message"} + fields = _build_log_fields(hit) + assert fields["event"] == "structured event" + # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide with event + assert fields["message"] == "plain message" + + def test_levelname_mapped_to_level(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"event": "msg", "levelname": "ERROR"} + result = _build_log_fields(hit) + assert result["level"] == "ERROR" + assert "levelname" not in result + + def test_at_timestamp_mapped_to_timestamp(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"} + result = _build_log_fields(hit) + assert result["timestamp"] == "2024-01-01T00:00:00Z" + assert "@timestamp" not in result + + def test_error_detail_is_kept_as_list(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + error_detail = [ + { + "is_cause": False, + "frames": [{"filename": "/dag.py", "lineno": 10, "name": "run"}], + "exc_type": "RuntimeError", + "exc_value": "Woopsie.", + } + ] + hit = { + "event": "Task failed with exception", + "error_detail": error_detail, + } + result = _build_log_fields(hit) + assert result["error_detail"] == error_detail + + def test_error_detail_dropped_when_empty(self): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + + hit = {"event": "msg", "error_detail": []} + result = _build_log_fields(hit) + assert "error_detail" not in result + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage only exists in Airflow 3+") + def test_read_includes_error_detail_in_structured_message(self): + """End-to-end: a hit with error_detail should surface it in the returned StructuredLogMessage.""" + from airflow.providers.opensearch.log.os_task_handler import OpensearchTaskHandler + + local_log_location = "local/log/location" + handler = OpensearchTaskHandler( + base_log_folder=local_log_location, + end_of_log_mark="end_of_log\n", + write_stdout=False, + json_format=False, + json_fields="asctime,filename,lineno,levelname,message,exc_text", + host="localhost", + port=9200, + username="admin", + password="password", + ) + + log_id = "test_dag-test_task-test_run--1-1" + body = { + "event": "Task failed with exception", + "log_id": log_id, + "offset": 1, + "error_detail": [ + { + "is_cause": False, + "frames": [ + {"filename": "/opt/airflow/dags/fail.py", "lineno": 13, "name": "log_and_raise"} + ], + "exc_type": "RuntimeError", + "exc_value": "Woopsie. Something went wrong.", + } + ], + } + + # Instead of firing up an OpenSearch client, we patch the IO and response class + mock_hit_dict = body.copy() + from airflow.providers.opensearch.log.os_response import Hit, OpensearchResponse + + mock_hit = Hit({"_source": mock_hit_dict}) + mock_response = mock.MagicMock(spec=OpensearchResponse) + mock_response.hits = [mock_hit] + mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit])) + mock_response.__bool__ = mock.Mock(return_value=True) + mock_response.__getitem__ = mock.Mock(return_value=mock_hit) + + with mock.patch.object(handler, "_os_read", return_value=mock_response): + with mock.patch.object(handler, "_group_logs_by_host", return_value={"localhost": [mock_hit]}): + from airflow.providers.opensearch.log.os_task_handler import _build_log_fields + from airflow.utils.log.file_task_handler import StructuredLogMessage + + fields = _build_log_fields(mock_hit.to_dict()) + msg = StructuredLogMessage(**fields) + + assert msg.event == "Task failed with exception" + assert hasattr(msg, "error_detail") + assert msg.error_detail == body["error_detail"]