Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions shared/logging/src/airflow_shared/logging/structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import sys
import weakref
from collections.abc import Callable, Iterable, Mapping, Sequence
from functools import cache, cached_property, partial
from functools import cache, cached_property
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
Expand Down Expand Up @@ -184,6 +184,56 @@ def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any:
)


_NAMESPACE_LEVEL_SEPARATORS = re.compile(r"[\s,]+")


def parse_namespace_log_levels(value: str | Mapping[str, str] | None) -> dict[str, int]:
"""
Parse the namespace logging levels configuration into per-logger levels.

Real callers always pass a string with a series of ``<logger>=<level>``
pairs separated by whitespaces and/or commas, or *None* if the configuration
is not set. See documentation on configuration for details.

Parsing is best-effort. Invalid entries are skipped with an ERROR level log
message.

An already-split ``Mapping`` of logger name to level name is also accepted
as a convenience for programmatic (test) callers; it is trusted and merely
resolved to numeric levels without validation.

:meta private:
"""
if value is None:
return {}
if not isinstance(value, str):
return {name: NAME_TO_LEVEL[level.lower()] for name, level in value.items()}

levels: dict[str, int] = {}
errors: list[str] = []
for entry in _NAMESPACE_LEVEL_SEPARATORS.split(value.strip()):
if not entry:
continue
logger_name, sep, level_name = entry.partition("=")
if not sep:
errors.append(f"malformed entry {entry!r}, expected '<logger>=<level>'")
continue
if not (logger_name := logger_name.strip()):
errors.append(f"malformed entry {entry!r}, logger name is empty")
continue
try:
levels[logger_name] = NAME_TO_LEVEL[(level_name := level_name.strip()).lower()]
except KeyError:
errors.append(
f"invalid level {level_name!r} for logger {logger_name!r}, "
f"expected one of: {', '.join(sorted(NAME_TO_LEVEL))}"
)

for error in errors:
log.error("Ignoring invalid namespace_levels entry: %s", error)
return levels


def make_filtering_logger() -> Callable[..., BindableLogger]:
def maker(logger: WrappedLogger, *args, **kwargs):
# If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then
Expand Down Expand Up @@ -555,21 +605,7 @@ def is_atty():
extra_processors = extra_processors or ()

PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]

# Extract per-logger-tree levels and set them
if isinstance(namespace_log_levels, str):
log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2)
namespace_log_levels = {
log: level for log, level in map(log_from_level, re.split(r"[\s,]+", namespace_log_levels))
}
if namespace_log_levels:
for log, level in namespace_log_levels.items():
try:
loglevel = NAME_TO_LEVEL[level.lower()]
except KeyError:
raise ValueError(f"Invalid log level for logger {log!r}: {level!r}") from None
else:
PER_LOGGER_LEVELS[log] = loglevel
PER_LOGGER_LEVELS.update(parse_namespace_log_levels(namespace_log_levels))

shared_pre_chain, for_stdlib, for_structlog = structlog_processors(
json_output,
Expand Down
60 changes: 59 additions & 1 deletion shared/logging/tests/logging/test_structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
from structlog.processors import CallsiteParameter

from airflow_shared.logging import structlog as structlog_module
from airflow_shared.logging.structlog import configure_logging
from airflow_shared.logging.structlog import (
NAME_TO_LEVEL,
configure_logging,
parse_namespace_log_levels,
)

# We avoid the caplog fixture for most tests here; the main purpose of this file is to capture the
# _rendered_ output of the tests to make sure it is correct.
Expand Down Expand Up @@ -728,3 +732,57 @@ def test_named_write_logger_preserves_name():

assert NamedWriteLogger("my.logger").name == "my.logger"
assert NamedWriteLogger().name is None


class TestParseNamespaceLogLevels:
"""Unit tests for the best-effort ``namespace_levels`` parser."""

@pytest.mark.parametrize("value", (None, "", " ", " , , "))
def test_blank_yields_no_overrides(self, value):
assert parse_namespace_log_levels(value) == {}

def test_parses_string_pairs(self):
assert parse_namespace_log_levels("sqlalchemy=INFO sqlalchemy.engine=DEBUG") == {
"sqlalchemy": NAME_TO_LEVEL["info"],
"sqlalchemy.engine": NAME_TO_LEVEL["debug"],
}

def test_accepts_commas_and_mixed_separators(self):
expected = {"a": NAME_TO_LEVEL["info"], "b": NAME_TO_LEVEL["debug"]}
assert parse_namespace_log_levels("a=INFO,b=DEBUG") == expected
assert parse_namespace_log_levels(" a=INFO ,, b=DEBUG ") == expected

def test_level_names_are_case_insensitive(self):
assert parse_namespace_log_levels("a=info") == {"a": NAME_TO_LEVEL["info"]}

def test_last_value_wins(self):
assert parse_namespace_log_levels("a=INFO a=ERROR") == {"a": NAME_TO_LEVEL["error"]}

def test_accepts_mapping_input(self):
assert parse_namespace_log_levels({"my.logger": "warning"}) == {
"my.logger": NAME_TO_LEVEL["warning"],
}

def test_valid_input_logs_nothing(self, caplog):
with caplog.at_level(logging.ERROR):
parse_namespace_log_levels("a=INFO")
assert not caplog.records

def test_entry_without_equals_is_skipped(self, caplog):
with caplog.at_level(logging.ERROR):
result = parse_namespace_log_levels("sqlalchemy=INFO botocore")
assert result == {"sqlalchemy": NAME_TO_LEVEL["info"]}
assert "botocore" in caplog.text
assert "<logger>=<level>" in caplog.text

def test_empty_logger_name_is_skipped(self, caplog):
with caplog.at_level(logging.ERROR):
result = parse_namespace_log_levels("=INFO a=DEBUG")
assert result == {"a": NAME_TO_LEVEL["debug"]}
assert "logger name is empty" in caplog.text

def test_unknown_level_is_skipped(self, caplog):
with caplog.at_level(logging.ERROR):
result = parse_namespace_log_levels("sqlalchemy=VERBOSE")
assert result == {}
assert "VERBOSE" in caplog.text