diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 74cb6dc196634..72884cfbde7b6 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -26,7 +26,7 @@ import sys import weakref from collections.abc import Callable, Iterable, Mapping, Sequence -from functools import cache, cached_property, partial +from functools import cache, cached_property from pathlib import Path from types import ModuleType from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast @@ -184,6 +184,56 @@ def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any: ) +_NAMESPACE_LEVEL_SEPARATORS = re.compile(r"[\s,]+") + + +def parse_namespace_log_levels(value: str | Mapping[str, str] | None) -> dict[str, int]: + """ + Parse the namespace logging levels configuration into per-logger levels. + + Real callers always pass a string with a series of ``=`` + pairs separated by whitespaces and/or commas, or *None* if the configuration + is not set. See documentation on configuration for details. + + Parsing is best-effort. Invalid entries are skipped with an ERROR level log + message. + + An already-split ``Mapping`` of logger name to level name is also accepted + as a convenience for programmatic (test) callers; it is trusted and merely + resolved to numeric levels without validation. + + :meta private: + """ + if value is None: + return {} + if not isinstance(value, str): + return {name: NAME_TO_LEVEL[level.lower()] for name, level in value.items()} + + levels: dict[str, int] = {} + errors: list[str] = [] + for entry in _NAMESPACE_LEVEL_SEPARATORS.split(value.strip()): + if not entry: + continue + logger_name, sep, level_name = entry.partition("=") + if not sep: + errors.append(f"malformed entry {entry!r}, expected '='") + continue + if not (logger_name := logger_name.strip()): + errors.append(f"malformed entry {entry!r}, logger name is empty") + continue + try: + levels[logger_name] = NAME_TO_LEVEL[(level_name := level_name.strip()).lower()] + except KeyError: + errors.append( + f"invalid level {level_name!r} for logger {logger_name!r}, " + f"expected one of: {', '.join(sorted(NAME_TO_LEVEL))}" + ) + + for error in errors: + log.error("Ignoring invalid namespace_levels entry: %s", error) + return levels + + def make_filtering_logger() -> Callable[..., BindableLogger]: def maker(logger: WrappedLogger, *args, **kwargs): # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then @@ -555,21 +605,7 @@ def is_atty(): extra_processors = extra_processors or () PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()] - - # Extract per-logger-tree levels and set them - if isinstance(namespace_log_levels, str): - log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2) - namespace_log_levels = { - log: level for log, level in map(log_from_level, re.split(r"[\s,]+", namespace_log_levels)) - } - if namespace_log_levels: - for log, level in namespace_log_levels.items(): - try: - loglevel = NAME_TO_LEVEL[level.lower()] - except KeyError: - raise ValueError(f"Invalid log level for logger {log!r}: {level!r}") from None - else: - PER_LOGGER_LEVELS[log] = loglevel + PER_LOGGER_LEVELS.update(parse_namespace_log_levels(namespace_log_levels)) shared_pre_chain, for_stdlib, for_structlog = structlog_processors( json_output, diff --git a/shared/logging/tests/logging/test_structlog.py b/shared/logging/tests/logging/test_structlog.py index a723467c3d429..ca85bb4f20e81 100644 --- a/shared/logging/tests/logging/test_structlog.py +++ b/shared/logging/tests/logging/test_structlog.py @@ -34,7 +34,11 @@ from structlog.processors import CallsiteParameter from airflow_shared.logging import structlog as structlog_module -from airflow_shared.logging.structlog import configure_logging +from airflow_shared.logging.structlog import ( + NAME_TO_LEVEL, + configure_logging, + parse_namespace_log_levels, +) # We avoid the caplog fixture for most tests here; the main purpose of this file is to capture the # _rendered_ output of the tests to make sure it is correct. @@ -728,3 +732,57 @@ def test_named_write_logger_preserves_name(): assert NamedWriteLogger("my.logger").name == "my.logger" assert NamedWriteLogger().name is None + + +class TestParseNamespaceLogLevels: + """Unit tests for the best-effort ``namespace_levels`` parser.""" + + @pytest.mark.parametrize("value", (None, "", " ", " , , ")) + def test_blank_yields_no_overrides(self, value): + assert parse_namespace_log_levels(value) == {} + + def test_parses_string_pairs(self): + assert parse_namespace_log_levels("sqlalchemy=INFO sqlalchemy.engine=DEBUG") == { + "sqlalchemy": NAME_TO_LEVEL["info"], + "sqlalchemy.engine": NAME_TO_LEVEL["debug"], + } + + def test_accepts_commas_and_mixed_separators(self): + expected = {"a": NAME_TO_LEVEL["info"], "b": NAME_TO_LEVEL["debug"]} + assert parse_namespace_log_levels("a=INFO,b=DEBUG") == expected + assert parse_namespace_log_levels(" a=INFO ,, b=DEBUG ") == expected + + def test_level_names_are_case_insensitive(self): + assert parse_namespace_log_levels("a=info") == {"a": NAME_TO_LEVEL["info"]} + + def test_last_value_wins(self): + assert parse_namespace_log_levels("a=INFO a=ERROR") == {"a": NAME_TO_LEVEL["error"]} + + def test_accepts_mapping_input(self): + assert parse_namespace_log_levels({"my.logger": "warning"}) == { + "my.logger": NAME_TO_LEVEL["warning"], + } + + def test_valid_input_logs_nothing(self, caplog): + with caplog.at_level(logging.ERROR): + parse_namespace_log_levels("a=INFO") + assert not caplog.records + + def test_entry_without_equals_is_skipped(self, caplog): + with caplog.at_level(logging.ERROR): + result = parse_namespace_log_levels("sqlalchemy=INFO botocore") + assert result == {"sqlalchemy": NAME_TO_LEVEL["info"]} + assert "botocore" in caplog.text + assert "=" in caplog.text + + def test_empty_logger_name_is_skipped(self, caplog): + with caplog.at_level(logging.ERROR): + result = parse_namespace_log_levels("=INFO a=DEBUG") + assert result == {"a": NAME_TO_LEVEL["debug"]} + assert "logger name is empty" in caplog.text + + def test_unknown_level_is_skipped(self, caplog): + with caplog.at_level(logging.ERROR): + result = parse_namespace_log_levels("sqlalchemy=VERBOSE") + assert result == {} + assert "VERBOSE" in caplog.text