Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@ python -c "import httpx; httpx.get('http://example.com', trust_env=False)"

### `NO_PROXY`

Valid values: a comma-separated list of hostnames/urls
Valid values: a comma-separated list of hostnames/urls/ip ranges

`NO_PROXY` disables the proxy for specific urls

```bash
export HTTP_PROXY=http://my-external-proxy.com:1234
export NO_PROXY=http://127.0.0.1,python-httpx.org
export NO_PROXY=http://127.0.0.0/8,python-httpx.org,::1

# As in the previous example, this request will be sent through the proxy
python -c "import httpx; httpx.get('http://example.com')"

# These requests will be sent directly, bypassing the proxy
python -c "import httpx; httpx.get('http://127.0.0.1:5000/my-api')"
python -c "import httpx; httpx.get('http://127.0.0.10:5000/my-api')"
python -c "import httpx; httpx.get('http://[::1]:5000/my-api')"
python -c "import httpx; httpx.get('https://www.python-httpx.org')"
```

Expand Down
14 changes: 7 additions & 7 deletions httpx/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
TimeoutTypes,
)
from ._urls import URL, QueryParams
from ._utils import URLPattern, get_environment_proxies
from ._utils import Pattern, build_url_pattern, get_environment_proxies

if typing.TYPE_CHECKING:
import ssl # pragma: no cover
Expand Down Expand Up @@ -694,8 +694,8 @@ def __init__(
limits=limits,
transport=transport,
)
self._mounts: dict[URLPattern, BaseTransport | None] = {
URLPattern(key): None
self._mounts: dict[Pattern, BaseTransport | None] = {
build_url_pattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
Expand All @@ -710,7 +710,7 @@ def __init__(
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
{build_url_pattern(key): transport for key, transport in mounts.items()}
)

self._mounts = dict(sorted(self._mounts.items()))
Expand Down Expand Up @@ -1409,8 +1409,8 @@ def __init__(
transport=transport,
)

self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
URLPattern(key): None
self._mounts: dict[Pattern, AsyncBaseTransport | None] = {
build_url_pattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
Expand All @@ -1425,7 +1425,7 @@ def __init__(
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
{build_url_pattern(key): transport for key, transport in mounts.items()}
)
self._mounts = dict(sorted(self._mounts.items()))

Expand Down
83 changes: 74 additions & 9 deletions httpx/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import re
import typing
from abc import abstractmethod
from urllib.request import getproxies

from ._types import PrimitiveData
Expand Down Expand Up @@ -67,7 +68,13 @@ def get_environment_proxies() -> dict[str, str | None]:
elif is_ipv4_hostname(hostname):
mounts[f"all://{hostname}"] = None
elif is_ipv6_hostname(hostname):
mounts[f"all://[{hostname}]"] = None
if "/" in hostname:
CIDR = hostname.split("/")
hostname = f"{CIDR[0]}"
subnet = f"/{CIDR[1]}"
else:
subnet = ""
mounts[f"all://[{hostname}]{subnet}"] = None
elif hostname.lower() == "localhost":
mounts[f"all://{hostname}"] = None
else:
Expand Down Expand Up @@ -117,24 +124,41 @@ def peek_filelike_length(stream: typing.Any) -> int | None:
return length


class URLPattern:
class Pattern(typing.Protocol):
@abstractmethod
def matches(self, other: URL) -> bool:
"""this method should never be accessed"""

@property
@abstractmethod
def priority(self) -> tuple[int, int, int]:
"""this property should never be accessed"""

def __lt__(self, other: Pattern) -> bool:
"""this method should never be accessed"""

def __eq__(self, other: typing.Any) -> bool:
"""this method should never be accessed"""


class WildcardURLPattern(Pattern):
"""
A utility class currently used for making lookups against proxy keys...

# Wildcard matching...
>>> pattern = URLPattern("all://")
>>> pattern = WildcardURLPattern("all://")
>>> pattern.matches(httpx.URL("http://example.com"))
True

# Witch scheme matching...
>>> pattern = URLPattern("https://")
>>> pattern = WildcardURLPattern("https://")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
False

# With domain matching...
>>> pattern = URLPattern("https://example.com")
>>> pattern = WildcardURLPattern("https://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
Expand All @@ -143,7 +167,7 @@ class URLPattern:
False

# Wildcard scheme, with domain matching...
>>> pattern = URLPattern("all://example.com")
>>> pattern = WildcardURLPattern("all://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
Expand All @@ -152,7 +176,7 @@ class URLPattern:
False

# With port matching...
>>> pattern = URLPattern("https://example.com:1234")
>>> pattern = WildcardURLPattern("https://example.com:1234")
>>> pattern.matches(httpx.URL("https://example.com:1234"))
True
>>> pattern.matches(httpx.URL("https://example.com"))
Expand Down Expand Up @@ -219,11 +243,52 @@ def priority(self) -> tuple[int, int, int]:
def __hash__(self) -> int:
return hash(self.pattern)

def __lt__(self, other: URLPattern) -> bool:
def __lt__(self, other: Pattern) -> bool:
return self.priority < other.priority

def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, WildcardURLPattern) and self.pattern == other.pattern


class IPNetPattern(Pattern):
def __init__(self, ip_net: str) -> None:
try:
addr, range = ip_net.split("/", 1)
if addr[0] == "[" and addr[-1] == "]":
addr = addr[1:-1]
ip_net = f"{addr}/{range}"
except ValueError:
pass # not a range
self.net = ipaddress.ip_network(ip_net)

def matches(self, other: URL) -> bool:
try:
return ipaddress.ip_address(other.host) in self.net
except ValueError:
return False

@property
def priority(self) -> tuple[int, int, int]:
return -1, 0, 0 # higher priority than URLPatterns

def __hash__(self) -> int:
return hash(self.net)

def __lt__(self, other: Pattern) -> bool:
return self.priority < other.priority

def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, URLPattern) and self.pattern == other.pattern
return isinstance(other, IPNetPattern) and self.net == other.net


def build_url_pattern(pattern: str) -> Pattern:
try:
proto, rest = pattern.split("://", 1)
if proto == "all" and "/" in rest:
return IPNetPattern(rest)
except ValueError: # covers .split() and IPNetPattern
pass
return WildcardURLPattern(pattern)


def is_ipv4_hostname(hostname: str) -> bool:
Expand Down
60 changes: 50 additions & 10 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import pytest

import httpx
from httpx._utils import URLPattern, get_environment_proxies
from httpx._utils import (
IPNetPattern,
WildcardURLPattern,
build_url_pattern,
get_environment_proxies,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -100,6 +105,7 @@ def test_logging_redirect_chain(server, caplog):
({"no_proxy": "127.0.0.1"}, {"all://127.0.0.1": None}),
({"no_proxy": "192.168.0.0/16"}, {"all://192.168.0.0/16": None}),
({"no_proxy": "::1"}, {"all://[::1]": None}),
({"no_proxy": "fe11::/16"}, {"all://[fe11::]/16": None}),
({"no_proxy": "localhost"}, {"all://localhost": None}),
({"no_proxy": "github.com"}, {"all://*github.com": None}),
({"no_proxy": ".github.com"}, {"all://*.github.com": None}),
Expand Down Expand Up @@ -127,24 +133,58 @@ def test_get_environment_proxies(environment, proxies):
("http://", "https://example.com", False),
("all://", "https://example.com:123", True),
("", "https://example.com:123", True),
("all://192.168.0.0/24", "http://192.168.0.1", True),
("all://192.168.0.0/24", "https://192.168.1.1", False),
("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:12::1]", True),
("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:13::1]:8080", False),
],
)
def test_url_matches(pattern, url, expected):
pattern = URLPattern(pattern)
pattern = build_url_pattern(pattern)
assert pattern.matches(httpx.URL(url)) == expected


@pytest.mark.parametrize(
["pattern", "url", "expected"],
[
("all://192.168.0.0/24", "http://192.168.0.1", True),
("all://192.168.0.1", "http://192.168.0.1", True),
("all://192.168.0.0/24", "foobar", False),
],
)
def test_IPNetPattern(pattern, url, expected):
proto, rest = pattern.split("://", 1)
pattern = IPNetPattern(rest)
assert pattern.matches(httpx.URL(url)) == expected


def test_build_url_pattern():
pattern1 = build_url_pattern("all://192.168.0.0/16")
pattern2 = build_url_pattern("all://192.168.0.0/16")
pattern3 = build_url_pattern("all://192.168.0.1")
assert isinstance(pattern1, IPNetPattern)
assert isinstance(pattern2, IPNetPattern)
assert isinstance(pattern3, WildcardURLPattern)
assert pattern1 == pattern2
assert pattern2 != pattern3
assert pattern1 < pattern3
assert hash(pattern1) == hash(pattern2)
assert hash(pattern2) != hash(pattern3)


def test_pattern_priority():
matchers = [
URLPattern("all://"),
URLPattern("http://"),
URLPattern("http://example.com"),
URLPattern("http://example.com:123"),
build_url_pattern("all://"),
build_url_pattern("http://"),
build_url_pattern("http://example.com"),
build_url_pattern("http://example.com:123"),
build_url_pattern("all://192.168.0.0/16"),
]
random.shuffle(matchers)
assert sorted(matchers) == [
URLPattern("http://example.com:123"),
URLPattern("http://example.com"),
URLPattern("http://"),
URLPattern("all://"),
build_url_pattern("all://192.168.0.0/16"),
build_url_pattern("http://example.com:123"),
build_url_pattern("http://example.com"),
build_url_pattern("http://"),
build_url_pattern("all://"),
]