diff --git a/docs/environment_variables.md b/docs/environment_variables.md index 0364deb061..7ff1543009 100644 --- a/docs/environment_variables.md +++ b/docs/environment_variables.md @@ -36,19 +36,20 @@ python -c "import httpx; httpx.get('http://example.com', trust_env=False)" ### `NO_PROXY` -Valid values: a comma-separated list of hostnames/urls +Valid values: a comma-separated list of hostnames/urls/ip ranges `NO_PROXY` disables the proxy for specific urls ```bash export HTTP_PROXY=http://my-external-proxy.com:1234 -export NO_PROXY=http://127.0.0.1,python-httpx.org +export NO_PROXY=http://127.0.0.0/8,python-httpx.org,::1 # As in the previous example, this request will be sent through the proxy python -c "import httpx; httpx.get('http://example.com')" # These requests will be sent directly, bypassing the proxy -python -c "import httpx; httpx.get('http://127.0.0.1:5000/my-api')" +python -c "import httpx; httpx.get('http://127.0.0.10:5000/my-api')" +python -c "import httpx; httpx.get('http://[::1]:5000/my-api')" python -c "import httpx; httpx.get('https://www.python-httpx.org')" ``` diff --git a/httpx/_client.py b/httpx/_client.py index 13cd933673..833ab8733f 100644 --- a/httpx/_client.py +++ b/httpx/_client.py @@ -46,7 +46,7 @@ TimeoutTypes, ) from ._urls import URL, QueryParams -from ._utils import URLPattern, get_environment_proxies +from ._utils import Pattern, build_url_pattern, get_environment_proxies if typing.TYPE_CHECKING: import ssl # pragma: no cover @@ -694,8 +694,8 @@ def __init__( limits=limits, transport=transport, ) - self._mounts: dict[URLPattern, BaseTransport | None] = { - URLPattern(key): None + self._mounts: dict[Pattern, BaseTransport | None] = { + build_url_pattern(key): None if proxy is None else self._init_proxy_transport( proxy, @@ -710,7 +710,7 @@ def __init__( } if mounts is not None: self._mounts.update( - {URLPattern(key): transport for key, transport in mounts.items()} + {build_url_pattern(key): transport for key, transport in mounts.items()} ) self._mounts = dict(sorted(self._mounts.items())) @@ -1409,8 +1409,8 @@ def __init__( transport=transport, ) - self._mounts: dict[URLPattern, AsyncBaseTransport | None] = { - URLPattern(key): None + self._mounts: dict[Pattern, AsyncBaseTransport | None] = { + build_url_pattern(key): None if proxy is None else self._init_proxy_transport( proxy, @@ -1425,7 +1425,7 @@ def __init__( } if mounts is not None: self._mounts.update( - {URLPattern(key): transport for key, transport in mounts.items()} + {build_url_pattern(key): transport for key, transport in mounts.items()} ) self._mounts = dict(sorted(self._mounts.items())) diff --git a/httpx/_utils.py b/httpx/_utils.py index 7fe827da4d..b492e5b099 100644 --- a/httpx/_utils.py +++ b/httpx/_utils.py @@ -4,6 +4,7 @@ import os import re import typing +from abc import abstractmethod from urllib.request import getproxies from ._types import PrimitiveData @@ -67,7 +68,13 @@ def get_environment_proxies() -> dict[str, str | None]: elif is_ipv4_hostname(hostname): mounts[f"all://{hostname}"] = None elif is_ipv6_hostname(hostname): - mounts[f"all://[{hostname}]"] = None + if "/" in hostname: + CIDR = hostname.split("/") + hostname = f"{CIDR[0]}" + subnet = f"/{CIDR[1]}" + else: + subnet = "" + mounts[f"all://[{hostname}]{subnet}"] = None elif hostname.lower() == "localhost": mounts[f"all://{hostname}"] = None else: @@ -117,24 +124,41 @@ def peek_filelike_length(stream: typing.Any) -> int | None: return length -class URLPattern: +class Pattern(typing.Protocol): + @abstractmethod + def matches(self, other: URL) -> bool: + """this method should never be accessed""" + + @property + @abstractmethod + def priority(self) -> tuple[int, int, int]: + """this property should never be accessed""" + + def __lt__(self, other: Pattern) -> bool: + """this method should never be accessed""" + + def __eq__(self, other: typing.Any) -> bool: + """this method should never be accessed""" + + +class WildcardURLPattern(Pattern): """ A utility class currently used for making lookups against proxy keys... # Wildcard matching... - >>> pattern = URLPattern("all://") + >>> pattern = WildcardURLPattern("all://") >>> pattern.matches(httpx.URL("http://example.com")) True # Witch scheme matching... - >>> pattern = URLPattern("https://") + >>> pattern = WildcardURLPattern("https://") >>> pattern.matches(httpx.URL("https://example.com")) True >>> pattern.matches(httpx.URL("http://example.com")) False # With domain matching... - >>> pattern = URLPattern("https://example.com") + >>> pattern = WildcardURLPattern("https://example.com") >>> pattern.matches(httpx.URL("https://example.com")) True >>> pattern.matches(httpx.URL("http://example.com")) @@ -143,7 +167,7 @@ class URLPattern: False # Wildcard scheme, with domain matching... - >>> pattern = URLPattern("all://example.com") + >>> pattern = WildcardURLPattern("all://example.com") >>> pattern.matches(httpx.URL("https://example.com")) True >>> pattern.matches(httpx.URL("http://example.com")) @@ -152,7 +176,7 @@ class URLPattern: False # With port matching... - >>> pattern = URLPattern("https://example.com:1234") + >>> pattern = WildcardURLPattern("https://example.com:1234") >>> pattern.matches(httpx.URL("https://example.com:1234")) True >>> pattern.matches(httpx.URL("https://example.com")) @@ -219,11 +243,52 @@ def priority(self) -> tuple[int, int, int]: def __hash__(self) -> int: return hash(self.pattern) - def __lt__(self, other: URLPattern) -> bool: + def __lt__(self, other: Pattern) -> bool: + return self.priority < other.priority + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, WildcardURLPattern) and self.pattern == other.pattern + + +class IPNetPattern(Pattern): + def __init__(self, ip_net: str) -> None: + try: + addr, range = ip_net.split("/", 1) + if addr[0] == "[" and addr[-1] == "]": + addr = addr[1:-1] + ip_net = f"{addr}/{range}" + except ValueError: + pass # not a range + self.net = ipaddress.ip_network(ip_net) + + def matches(self, other: URL) -> bool: + try: + return ipaddress.ip_address(other.host) in self.net + except ValueError: + return False + + @property + def priority(self) -> tuple[int, int, int]: + return -1, 0, 0 # higher priority than URLPatterns + + def __hash__(self) -> int: + return hash(self.net) + + def __lt__(self, other: Pattern) -> bool: return self.priority < other.priority def __eq__(self, other: typing.Any) -> bool: - return isinstance(other, URLPattern) and self.pattern == other.pattern + return isinstance(other, IPNetPattern) and self.net == other.net + + +def build_url_pattern(pattern: str) -> Pattern: + try: + proto, rest = pattern.split("://", 1) + if proto == "all" and "/" in rest: + return IPNetPattern(rest) + except ValueError: # covers .split() and IPNetPattern + pass + return WildcardURLPattern(pattern) def is_ipv4_hostname(hostname: str) -> bool: diff --git a/tests/test_utils.py b/tests/test_utils.py index f9c215f65a..01e3566b8a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,7 +6,12 @@ import pytest import httpx -from httpx._utils import URLPattern, get_environment_proxies +from httpx._utils import ( + IPNetPattern, + WildcardURLPattern, + build_url_pattern, + get_environment_proxies, +) @pytest.mark.parametrize( @@ -100,6 +105,7 @@ def test_logging_redirect_chain(server, caplog): ({"no_proxy": "127.0.0.1"}, {"all://127.0.0.1": None}), ({"no_proxy": "192.168.0.0/16"}, {"all://192.168.0.0/16": None}), ({"no_proxy": "::1"}, {"all://[::1]": None}), + ({"no_proxy": "fe11::/16"}, {"all://[fe11::]/16": None}), ({"no_proxy": "localhost"}, {"all://localhost": None}), ({"no_proxy": "github.com"}, {"all://*github.com": None}), ({"no_proxy": ".github.com"}, {"all://*.github.com": None}), @@ -127,24 +133,58 @@ def test_get_environment_proxies(environment, proxies): ("http://", "https://example.com", False), ("all://", "https://example.com:123", True), ("", "https://example.com:123", True), + ("all://192.168.0.0/24", "http://192.168.0.1", True), + ("all://192.168.0.0/24", "https://192.168.1.1", False), + ("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:12::1]", True), + ("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:13::1]:8080", False), ], ) def test_url_matches(pattern, url, expected): - pattern = URLPattern(pattern) + pattern = build_url_pattern(pattern) assert pattern.matches(httpx.URL(url)) == expected +@pytest.mark.parametrize( + ["pattern", "url", "expected"], + [ + ("all://192.168.0.0/24", "http://192.168.0.1", True), + ("all://192.168.0.1", "http://192.168.0.1", True), + ("all://192.168.0.0/24", "foobar", False), + ], +) +def test_IPNetPattern(pattern, url, expected): + proto, rest = pattern.split("://", 1) + pattern = IPNetPattern(rest) + assert pattern.matches(httpx.URL(url)) == expected + + +def test_build_url_pattern(): + pattern1 = build_url_pattern("all://192.168.0.0/16") + pattern2 = build_url_pattern("all://192.168.0.0/16") + pattern3 = build_url_pattern("all://192.168.0.1") + assert isinstance(pattern1, IPNetPattern) + assert isinstance(pattern2, IPNetPattern) + assert isinstance(pattern3, WildcardURLPattern) + assert pattern1 == pattern2 + assert pattern2 != pattern3 + assert pattern1 < pattern3 + assert hash(pattern1) == hash(pattern2) + assert hash(pattern2) != hash(pattern3) + + def test_pattern_priority(): matchers = [ - URLPattern("all://"), - URLPattern("http://"), - URLPattern("http://example.com"), - URLPattern("http://example.com:123"), + build_url_pattern("all://"), + build_url_pattern("http://"), + build_url_pattern("http://example.com"), + build_url_pattern("http://example.com:123"), + build_url_pattern("all://192.168.0.0/16"), ] random.shuffle(matchers) assert sorted(matchers) == [ - URLPattern("http://example.com:123"), - URLPattern("http://example.com"), - URLPattern("http://"), - URLPattern("all://"), + build_url_pattern("all://192.168.0.0/16"), + build_url_pattern("http://example.com:123"), + build_url_pattern("http://example.com"), + build_url_pattern("http://"), + build_url_pattern("all://"), ]