From 8611f81ec6e5b9d087f8061971009eaa41ca45e8 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Fri, 19 Dec 2025 16:36:23 -0600 Subject: [PATCH 1/7] Add HTTP multipart transport for GraphQL subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds support for HTTP multipart transport, which allows GraphQL subscriptions to work over HTTP using the multipart response format. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/code_examples/http_multipart_async.py | 37 ++ docs/modules/gql.rst | 1 + docs/modules/transport_http_multipart.rst | 7 + docs/transports/async_transports.rst | 1 + docs/transports/http_multipart.rst | 159 +++++ gql/transport/http_multipart_transport.py | 323 ++++++++++ tests/conftest.py | 1 + tests/test_http_multipart_transport.py | 698 +++++++++++++++++++++ 8 files changed, 1227 insertions(+) create mode 100644 docs/code_examples/http_multipart_async.py create mode 100644 docs/modules/transport_http_multipart.rst create mode 100644 docs/transports/http_multipart.rst create mode 100644 gql/transport/http_multipart_transport.py create mode 100644 tests/test_http_multipart_transport.py diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py new file mode 100644 index 00000000..d6d6e372 --- /dev/null +++ b/docs/code_examples/http_multipart_async.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from gql import Client, gql +from gql.transport.http_multipart_transport import HTTPMultipartTransport + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") + + # Using `async with` on the client will start a connection on the transport + # and provide a `session` variable to execute queries on this connection + async with Client( + transport=transport, + ) as session: + + # Request subscription + subscription = gql( + """ + subscription { + book { + title + author + } + } + """ + ) + + # Subscribe and receive streaming updates + async for result in session.subscribe(subscription): + print(f"Received: {result}") + + +asyncio.run(main()) diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 035f196f..6937286e 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,6 +29,7 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions + transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst new file mode 100644 index 00000000..0e91e0af --- /dev/null +++ b/docs/modules/transport_http_multipart.rst @@ -0,0 +1,7 @@ +gql.transport.http\_multipart\_transport module +=============================================== + +.. automodule:: gql.transport.http_multipart_transport + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index ba5ca136..7e81fd35 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,6 +11,7 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async + http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst new file mode 100644 index 00000000..416f82c9 --- /dev/null +++ b/docs/transports/http_multipart.rst @@ -0,0 +1,159 @@ +.. _http_multipart_transport: + +HTTPMultipartTransport +====================== + +This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. + +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` + +.. note:: + + This transport is specifically designed for GraphQL subscriptions. While it can handle + queries and mutations via the ``execute()`` method, standard HTTP transports like + :ref:`AIOHTTPTransport ` are more efficient for those operations. + +.. literalinclude:: ../code_examples/http_multipart_async.py + +How It Works +------------ + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +Protocol Details +---------------- + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Authentication +-------------- + +Authentication works the same as with :ref:`AIOHTTPTransport `. + +Using HTTP Headers +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL:SERVER_PORT/graphql', + headers={'Authorization': 'Bearer YOUR_TOKEN'} + ) + +Using HTTP Cookies +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url=url, + cookies={"session_id": "your_session_cookie"} + ) + +Or use a cookie jar to save and reuse cookies: + +.. code-block:: python + + import aiohttp + + jar = aiohttp.CookieJar() + transport = HTTPMultipartTransport( + url=url, + client_session_args={'cookie_jar': jar} + ) + +Configuration +------------- + +Timeout Settings +^^^^^^^^^^^^^^^^ + +Set a timeout for the HTTP request: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + timeout=30 # 30 second timeout + ) + +SSL Configuration +^^^^^^^^^^^^^^^^^ + +Control SSL certificate verification: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=False # Disable SSL verification (not recommended for production) + ) + +Or provide a custom SSL context: + +.. code-block:: python + + import ssl + + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain('client.crt', 'client.key') + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=ssl_context + ) + +Limitations +----------- + +- This transport requires the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py new file mode 100644 index 00000000..81dd1e51 --- /dev/null +++ b/gql/transport/http_multipart_transport.py @@ -0,0 +1,323 @@ +""" +HTTP Multipart Transport for GraphQL Subscriptions + +This transport implements support for GraphQL subscriptions over HTTP using +the multipart subscription protocol as implemented by Apollo GraphOS Router +and other compatible servers. + +Reference: +https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol +""" + +import asyncio +import json +import logging +from ssl import SSLContext +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union + +import aiohttp +from aiohttp.client_reqrep import Fingerprint +from aiohttp.helpers import BasicAuth +from aiohttp.typedefs import LooseCookies, LooseHeaders +from graphql import ExecutionResult +from multidict import CIMultiDictProxy + +from gql.graphql_request import GraphQLRequest +from gql.transport.async_transport import AsyncTransport +from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +log = logging.getLogger(__name__) + + +class HTTPMultipartTransport(AsyncTransport): + """ + Async Transport for GraphQL subscriptions using the multipart subscription protocol. + + This transport sends GraphQL subscription queries via HTTP POST and receives + streaming multipart/mixed responses, where each part contains a JSON payload + with GraphQL execution results. This protocol is implemented by Apollo GraphOS + Router and other compatible servers. + """ + + def __init__( + self, + url: str, + headers: Optional[LooseHeaders] = None, + cookies: Optional[LooseCookies] = None, + auth: Optional[BasicAuth] = None, + ssl: Union[SSLContext, bool, Fingerprint] = True, + timeout: Optional[int] = None, + ssl_close_timeout: Optional[Union[int, float]] = 10, + json_serialize: Callable = json.dumps, + json_deserialize: Callable = json.loads, + client_session_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the HTTP Multipart transport. + + :param url: The GraphQL server URL (http or https) + :param headers: Dict of HTTP Headers + :param cookies: Dict of HTTP cookies + :param auth: BasicAuth object for HTTP authentication + :param ssl: SSL context or validation mode + :param timeout: Request timeout in seconds + :param ssl_close_timeout: Timeout for SSL connection close + :param json_serialize: JSON serializer function + :param json_deserialize: JSON deserializer function + :param client_session_args: Extra args for aiohttp.ClientSession + """ + self.url = url + self.headers = headers or {} + self.cookies = cookies + self.auth = auth + self.ssl = ssl + self.timeout = timeout + self.ssl_close_timeout = ssl_close_timeout + self.json_serialize = json_serialize + self.json_deserialize = json_deserialize + self.client_session_args = client_session_args or {} + + self.session: Optional[aiohttp.ClientSession] = None + self.response_headers: Optional[CIMultiDictProxy[str]] = None + + async def connect(self) -> None: + """Create an aiohttp ClientSession.""" + if self.session is not None: + raise TransportAlreadyConnected("Transport is already connected") + + client_session_args: Dict[str, Any] = { + "cookies": self.cookies, + "headers": self.headers, + "auth": self.auth, + "json_serialize": self.json_serialize, + } + + if self.timeout is not None: + client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) + + client_session_args.update(self.client_session_args) + + log.debug("Connecting HTTP Multipart transport") + self.session = aiohttp.ClientSession(**client_session_args) + + async def close(self) -> None: + """Close the aiohttp session.""" + if self.session is not None: + log.debug("Closing HTTP Multipart transport") + + if ( + self.client_session_args + and self.client_session_args.get("connector_owner") is False + ): + log.debug("connector_owner is False -> not closing connector") + else: + closed_event = create_aiohttp_closed_event(self.session) + await self.session.close() + try: + await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) + except asyncio.TimeoutError: + pass + + self.session = None + + async def subscribe( + self, + request: GraphQLRequest, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Execute a GraphQL subscription and yield results from multipart response. + + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream + """ + if self.session is None: + raise TransportClosed("Transport is not connected") + + payload = request.payload + if log.isEnabledFor(logging.DEBUG): + log.debug(">>> %s", self.json_serialize(payload)) + + headers = { + "Content-Type": "application/json", + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), + } + + try: + # Make the POST request + async with self.session.post( + self.url, + json=payload, + headers=headers, + ssl=self.ssl, + ) as response: + # Save response headers + self.response_headers = response.headers + + # Check for errors + if response.status >= 400: + error_text = await response.text() + raise TransportServerError( + f"Server returned {response.status}: {error_text}", + response.status, + ) + + initial_content_type = response.headers.get("Content-Type", "") + if ( + ("multipart/mixed" not in initial_content_type) + or ("boundary=graphql" not in initial_content_type) + or ("subscriptionSpec=1.0" not in initial_content_type) + or ("application/json" not in initial_content_type) + ): + raise TransportProtocolError( + f"Unexpected content-type: {initial_content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(response): + yield result + + except (TransportServerError, TransportProtocolError): + # Let these exceptions propagate without wrapping + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart response stream and yield execution results. + + Uses aiohttp's built-in MultipartReader to handle the multipart protocol. + + :param response: The aiohttp response object + :yields: ExecutionResult objects + """ + # Use aiohttp's built-in multipart reader + reader = aiohttp.MultipartReader.from_response(response) + + # Iterate through each part in the multipart response + while True: + try: + part = await reader.next() + except Exception: + # reader.next() throws on empty parts at the end of the stream. + # (some servers may send this.) + # see: https://github.com/aio-libs/aiohttp/pull/11857 + # As an ugly workaround for now, we can check if we've reached + # EOF and assume this was the case. + if reader.at_eof(): + break + + # Otherwise, re-raise unexpected errors + raise # pragma: no cover + + if part is None: + # No more parts + break + + assert not isinstance( + part, aiohttp.MultipartReader + ), "Nested multipart parts are not supported in GraphQL subscriptions" + + result = await self._parse_multipart_part(part) + if result: + yield result + + async def _parse_multipart_part( + self, part: aiohttp.BodyPartReader + ) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part: aiohttp BodyPartReader for the part + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Verify the part has the correct content type + content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "") + if not content_type.startswith("application/json"): + raise TransportProtocolError( + f"Unexpected part content-type: {content_type}. " + "Expected 'application/json'." + ) + + try: + # Read the part content as text + body = await part.text() + body = body.strip() + + if log.isEnabledFor(logging.DEBUG): + log.debug("<<< %s", body or "(empty body, skipping)") + + if not body: + return None + + # Parse JSON body using custom deserializer + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data: + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" not in data: + log.warning("Invalid response: missing 'payload' field") + return None + + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + # If there are errors, this is a transport-level error + errors = data.get("errors") + if errors: + error_messages = [ + error.get("message", "Unknown transport error") + for error in errors + ] + + for message in error_messages: + log.error(f"Transport error: {message}") + + raise TransportServerError("\n\n".join(error_messages)) + else: + # Null payload without errors - just skip this part + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning( + f"Failed to parse JSON: {e}, body: {body[:100] if body else ''}" + ) + return None + + async def execute( + self, + request: GraphQLRequest, + ) -> ExecutionResult: + """ + :raises: NotImplementedError - This transport only supports subscriptions + """ + raise NotImplementedError( + "The HTTP multipart transport does not support queries or " + "mutations. Use HTTPTransport for queries and mutations, or use " + "subscribe() for subscriptions." + ) diff --git a/tests/conftest.py b/tests/conftest.py index cef561f7..9de910ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,6 +128,7 @@ async def ssl_aiohttp_server(): "gql.transport.appsync", "gql.transport.common.base", "gql.transport.httpx", + "gql.transport.http_multipart_transport", "gql.transport.phoenix_channel_websockets", "gql.transport.requests", "gql.transport.websockets", diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py new file mode 100644 index 00000000..12002d37 --- /dev/null +++ b/tests/test_http_multipart_transport.py @@ -0,0 +1,698 @@ +import asyncio +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from gql import Client, gql +from gql.graphql_request import GraphQLRequest +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +# Marking all tests in this file with the aiohttp marker +pytestmark = pytest.mark.aiohttp + +subscription_str = """ + subscription { + book { + title + author + } + } +""" + +book1 = {"title": "Book 1", "author": "Author 1"} +book2 = {"title": "Book 2", "author": "Author 2"} +book3 = {"title": "Book 3", "author": "Author 3"} + + +def create_multipart_response(books, *, separator="\r\n", include_heartbeat=False): + """Helper to create parts for a streamed response body.""" + parts = [] + + for idx, book in enumerate(books): + data = {"data": {"book": book}} + payload = {"payload": data} + + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{json.dumps(payload)}{separator}" + )) # fmt: skip + + # Add heartbeat after first item if requested + if include_heartbeat and idx == 0: + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{{}}{separator}" + )) # fmt: skip + + # Add end boundary + parts.append(f"--graphql--{separator}") + + return parts + + +@pytest.fixture +def multipart_server(aiohttp_server): + from aiohttp import web + + async def create_server( + parts, + *, + content_type=( + "multipart/mixed;boundary=graphql;subscriptionSpec=1.0,application/json" + ), + request_handler=lambda *args: None, + ): + async def handler(request): + request_handler(request) + response = web.StreamResponse() + response.headers["Content-Type"] = content_type + response.enable_chunked_encoding() + await response.prepare(request) + for part in parts: + await response.write(part.encode()) + await asyncio.sleep(0) # force the chunk to be written + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + return server + + return create_server + + +@pytest.mark.asyncio +async def test_http_multipart_subscription(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + def assert_response_headers(request): + # Verify the Accept header follows the spec + accept_header = request.headers["accept"] + assert "multipart/mixed" in accept_header + assert "boundary=graphql" in accept_header + assert "subscriptionSpec=1.0" in accept_header + assert "application/json" in accept_header + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts, request_handler=assert_response_headers) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_subscription_with_heartbeat(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2], include_heartbeat=True) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_unsupported_content_type(aiohttp_server): + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return text/html instead of application/json + return web.Response(text="

hello

", content_type="text/html") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected content-type" in str(exc_info.value) + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_server_error(aiohttp_server): + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response(text="Internal Server Error", status=500) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "500: Internal Server Error" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_level_error(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Transport error has null payload with errors at top level + error_response = { + "payload": None, + "errors": [{"message": "Transport connection failed"}], + } + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(error_response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Transport connection failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_graphql_errors(multipart_server): + from gql.transport.exceptions import TransportQueryError + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # GraphQL errors come inside the payload + response = { + "payload": { + "data": {"book": {**book1, "author": None}}, + "errors": [ + {"message": "could not fetch author", "path": ["book", "author"]} + ], + } + } + parts = [ + ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # Client raises TransportQueryError when there are errors in the result + with pytest.raises(TransportQueryError) as exc_info: + async for result in session.subscribe(query): + pass + + # Verify error details + assert "could not fetch author" in str(exc_info.value).lower() + assert exc_info.value.data is not None + assert exc_info.value.data["book"]["author"] is None + # Verify we can still get data for the non-error fields + assert exc_info.value.data["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_execute_method(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_already_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + await transport.connect() + + with pytest.raises(TransportAlreadyConnected): + await transport.connect() + + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_transport_not_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + request = GraphQLRequest(query) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(request): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_execute_empty_response(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Return empty multipart response (no data parts) + parts = ["--graphql--\r\n"] + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_newline_separator(multipart_server): + """Test that LF-only separators are rejected (spec requires CRLF).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # The GraphQL over HTTP spec requires CRLF line endings in multipart responses + # https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # Non-compliant multipart format (LF instead of CRLF) should fail + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_transport_connection_failed_error(): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Use an invalid URL that will fail to connect + transport = HTTPMultipartTransport(url="http://invalid.local:-1/graphql", timeout=1) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_connector_owner_false(multipart_server): + """Test closing transport with connector_owner=False.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport( + url=url, client_session_args={"connector_owner": False} + ) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_ssl_close_timeout(multipart_server): + """Test SSL close timeout during transport close.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport(url=url, ssl_close_timeout=0.001) + + await transport.connect() + + # Mock the closed event to timeout + with patch( + "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + ) as mock_event: + mock_wait = AsyncMock() + mock_wait.side_effect = asyncio.TimeoutError() + mock_event.return_value.wait = mock_wait + + # Should handle timeout gracefully + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_malformed_json(multipart_server): + """Test handling of malformed JSON in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{invalid json }\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip malformed parts + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_payload_null_no_errors(multipart_server): + """Test handling of null payload without errors.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Null payload but no errors + response = {"payload": None} + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Null payload without errors should return nothing + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_invalid_utf8(multipart_server): + """Test handling of invalid UTF-8 in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "\xff\xfe\r\n" # Contains invalid UTF-8 + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid part + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_chunked_boundary_split(multipart_server): + """Test parsing when boundary is split across chunks.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + "--gra", + ( + "phql\r\nContent-Type: application/json\r\n\r\n" + '{"payload": {"data": {"book": {"title": "Bo' + ), + 'ok 1"}}}}\r\n--graphql--\r\n', + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_wrong_part_content_type(multipart_server): + """Test that parts with wrong content-type raise an error.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with text/html instead of application/json + parts = [ + ("--graphql\r\n" "Content-Type: text/html\r\n" "\r\n" "

hello

\r\n"), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected part content-type" in str(exc_info.value) + assert "text/html" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_response_headers(multipart_server): + """Test that response headers are captured in the transport.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Verify response headers are captured + assert transport.response_headers is not None + assert "Content-Type" in transport.response_headers + assert "multipart/mixed" in transport.response_headers["Content-Type"] + + +@pytest.mark.asyncio +async def test_http_multipart_empty_body(multipart_server): + """Test part with empty body after stripping.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with only whitespace body + parts = [ + "--graphql\r\nContent-Type: application/json\r\n\r\n \r\n", + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_missing_payload_field(multipart_server): + """Test handling of response missing required 'payload' field.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + response = {"foo": "bar"} # No payload field! + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid response and return no results + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_with_content_length_headers(multipart_server): + """Test multipart response with Content-Length headers (like real servers send).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Simulate real server behavior: each part has Content-Length header + book1_payload = json.dumps({"payload": {"data": {"book": book1}}}) + book2_payload = json.dumps({"payload": {"data": {"book": book2}}}) + heartbeat_payload = "{}" + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(heartbeat_payload)}\r\n" + "\r\n" + f"{heartbeat_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book1_payload)}\r\n" + "\r\n" + f"{book1_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book2_payload)}\r\n" + "\r\n" + f"{book2_payload}\r\n" + ), + "--graphql\r\n", # Extra empty part like real servers + "--graphql--\r\n", # Final boundary + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should get 2 books (heartbeat and empty part filtered) + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" From 851ca37cc2f016d1cc23e8bc20328ac90e00ef7a Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Mon, 22 Dec 2025 22:45:10 +0100 Subject: [PATCH 2/7] Fix PyPy crash on invalid UTF-8 in multipart transport logs --- gql/transport/http_multipart_transport.py | 8 ++++-- tests/test_http_multipart_transport.py | 35 ++++++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index 81dd1e51..8ea31afa 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -259,7 +259,7 @@ async def _parse_multipart_part( body = body.strip() if log.isEnabledFor(logging.DEBUG): - log.debug("<<< %s", body or "(empty body, skipping)") + log.debug("<<< %s", ascii(body or "(empty body, skipping)")) if not body: return None @@ -305,9 +305,13 @@ async def _parse_multipart_part( ) except json.JSONDecodeError as e: log.warning( - f"Failed to parse JSON: {e}, body: {body[:100] if body else ''}" + f"Failed to parse JSON: {ascii(e)}, " + f"body: {ascii(body[:100]) if body else ''}" ) return None + except UnicodeDecodeError as e: + log.warning(f"Failed to decode part: {ascii(e)}") + return None async def execute( self, diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index 12002d37..75bc7e1b 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -80,7 +80,10 @@ async def handler(request): response.enable_chunked_encoding() await response.prepare(request) for part in parts: - await response.write(part.encode()) + if isinstance(part, str): + await response.write(part.encode()) + else: + await response.write(part) await asyncio.sleep(0) # force the chunk to be written await response.write_eof() return response @@ -696,3 +699,33 @@ async def test_http_multipart_with_content_length_headers(multipart_server): assert len(results) == 2 assert results[0]["book"]["title"] == "Book 1" assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_actually_invalid_utf8(multipart_server): + """Test handling of ACTUAL invalid UTF-8 bytes in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # \\x80 is an invalid start byte in UTF-8 + parts = [ + ( + b"--graphql\r\n" + b"Content-Type: application/json; charset=utf-8\r\n" + b"\r\n" + b"\x80\x81\r\n" + ), + b"--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid part and not crash + assert len(results) == 0 From a8630e5c980ecf214df2aa788e202f2615851f35 Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Tue, 23 Dec 2025 13:11:33 +0100 Subject: [PATCH 3/7] Merge HTTPMultipartTransport into AIOHTTPTransport --- ...c.py => aiohttp_multipart_subscription.py} | 4 +- docs/modules/gql.rst | 1 - docs/modules/transport_http_multipart.rst | 7 - docs/transports/aiohttp.rst | 76 +++- docs/transports/async_transports.rst | 1 - docs/transports/http_multipart.rst | 159 --------- gql/transport/aiohttp.py | 183 +++++++++- gql/transport/http_multipart_transport.py | 327 ------------------ tests/test_aiohttp.py | 37 +- ...transport.py => test_aiohttp_multipart.py} | 174 ++++------ 10 files changed, 326 insertions(+), 643 deletions(-) rename docs/code_examples/{http_multipart_async.py => aiohttp_multipart_subscription.py} (82%) delete mode 100644 docs/modules/transport_http_multipart.rst delete mode 100644 docs/transports/http_multipart.rst delete mode 100644 gql/transport/http_multipart_transport.py rename tests/{test_http_multipart_transport.py => test_aiohttp_multipart.py} (74%) diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/aiohttp_multipart_subscription.py similarity index 82% rename from docs/code_examples/http_multipart_async.py rename to docs/code_examples/aiohttp_multipart_subscription.py index d6d6e372..bb09a35c 100644 --- a/docs/code_examples/http_multipart_async.py +++ b/docs/code_examples/aiohttp_multipart_subscription.py @@ -2,14 +2,14 @@ import logging from gql import Client, gql -from gql.transport.http_multipart_transport import HTTPMultipartTransport +from gql.transport.aiohttp import AIOHTTPTransport logging.basicConfig(level=logging.INFO) async def main(): - transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") + transport = AIOHTTPTransport(url="https://gql-book-server.fly.dev/graphql") # Using `async with` on the client will start a connection on the transport # and provide a `session` variable to execute queries on this connection diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 6937286e..035f196f 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,7 +29,6 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions - transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst deleted file mode 100644 index 0e91e0af..00000000 --- a/docs/modules/transport_http_multipart.rst +++ /dev/null @@ -1,7 +0,0 @@ -gql.transport.http\_multipart\_transport module -=============================================== - -.. automodule:: gql.transport.http_multipart_transport - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/transports/aiohttp.rst b/docs/transports/aiohttp.rst index b852108b..f8f93d64 100644 --- a/docs/transports/aiohttp.rst +++ b/docs/transports/aiohttp.rst @@ -7,15 +7,79 @@ This transport uses the `aiohttp`_ library and allows you to send GraphQL querie Reference: :class:`gql.transport.aiohttp.AIOHTTPTransport` -.. note:: +This transport supports both standard GraphQL operations (queries, mutations) and subscriptions. +Subscriptions are implemented using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. - GraphQL subscriptions are not supported on the HTTP transport. - For subscriptions you should use a websockets transport: - :ref:`WebsocketsTransport ` or - :ref:`AIOHTTPWebsocketsTransport `. +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Queries +------- .. literalinclude:: ../code_examples/aiohttp_async.py +Subscriptions +------------- + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +.. literalinclude:: ../code_examples/aiohttp_multipart_subscription.py + +How It Works +^^^^^^^^^^^^ + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Limitations +^^^^^^^^^^^ + +- Subscriptions require the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + Authentication -------------- @@ -52,3 +116,5 @@ and you can save these cookies in a cookie jar to reuse them in a following conn .. _aiohttp: https://docs.aiohttp.org .. _issue 197: https://github.com/graphql-python/gql/issues/197 +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol + diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index 7e81fd35..ba5ca136 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,7 +11,6 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async - http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst deleted file mode 100644 index 416f82c9..00000000 --- a/docs/transports/http_multipart.rst +++ /dev/null @@ -1,159 +0,0 @@ -.. _http_multipart_transport: - -HTTPMultipartTransport -====================== - -This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ -as implemented by Apollo GraphOS Router and other compatible servers. - -This provides an HTTP-based alternative to WebSocket transports for receiving streaming -subscription updates. It's particularly useful when: - -- WebSocket connections are not available or blocked by infrastructure -- You want to use standard HTTP with existing load balancers and proxies -- The backend implements the multipart subscription protocol - -Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` - -.. note:: - - This transport is specifically designed for GraphQL subscriptions. While it can handle - queries and mutations via the ``execute()`` method, standard HTTP transports like - :ref:`AIOHTTPTransport ` are more efficient for those operations. - -.. literalinclude:: ../code_examples/http_multipart_async.py - -How It Works ------------- - -The transport sends a standard HTTP POST request with an ``Accept`` header indicating -support for multipart responses: - -.. code-block:: text - - Accept: multipart/mixed;subscriptionSpec="1.0", application/json - -The server responds with a ``multipart/mixed`` content type and streams subscription -updates as separate parts in the response body. Each part contains a JSON payload -with GraphQL execution results. - -Protocol Details ----------------- - -**Message Format** - -Each message part follows this structure: - -.. code-block:: text - - --graphql - Content-Type: application/json - - {"payload": {"data": {...}, "errors": [...]}} - -**Heartbeats** - -Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the -connection alive. These are automatically filtered out by the transport. - -**Error Handling** - -The protocol distinguishes between two types of errors: - -- **GraphQL errors**: Returned within the ``payload`` property alongside data -- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload - -**End of Stream** - -The subscription ends when the server sends the final boundary marker: - -.. code-block:: text - - --graphql-- - -Authentication --------------- - -Authentication works the same as with :ref:`AIOHTTPTransport `. - -Using HTTP Headers -^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - transport = HTTPMultipartTransport( - url='https://SERVER_URL:SERVER_PORT/graphql', - headers={'Authorization': 'Bearer YOUR_TOKEN'} - ) - -Using HTTP Cookies -^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - transport = HTTPMultipartTransport( - url=url, - cookies={"session_id": "your_session_cookie"} - ) - -Or use a cookie jar to save and reuse cookies: - -.. code-block:: python - - import aiohttp - - jar = aiohttp.CookieJar() - transport = HTTPMultipartTransport( - url=url, - client_session_args={'cookie_jar': jar} - ) - -Configuration -------------- - -Timeout Settings -^^^^^^^^^^^^^^^^ - -Set a timeout for the HTTP request: - -.. code-block:: python - - transport = HTTPMultipartTransport( - url='https://SERVER_URL/graphql', - timeout=30 # 30 second timeout - ) - -SSL Configuration -^^^^^^^^^^^^^^^^^ - -Control SSL certificate verification: - -.. code-block:: python - - transport = HTTPMultipartTransport( - url='https://SERVER_URL/graphql', - ssl=False # Disable SSL verification (not recommended for production) - ) - -Or provide a custom SSL context: - -.. code-block:: python - - import ssl - - ssl_context = ssl.create_default_context() - ssl_context.load_cert_chain('client.crt', 'client.key') - - transport = HTTPMultipartTransport( - url='https://SERVER_URL/graphql', - ssl=ssl_context - ) - -Limitations ------------ - -- This transport requires the server to implement the multipart subscription protocol -- Long-lived connections may be terminated by intermediate proxies or load balancers -- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming - -.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index ab26bd03..e4ce2bd6 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -16,6 +16,7 @@ ) import aiohttp +from aiohttp import BodyPartReader, MultipartReader from aiohttp.client_exceptions import ClientResponseError from aiohttp.client_reqrep import Fingerprint from aiohttp.helpers import BasicAuth @@ -421,12 +422,186 @@ async def execute_batch( except Exception as e: raise TransportConnectionFailed(str(e)) from e - def subscribe( + async def subscribe( self, request: GraphQLRequest, ) -> AsyncGenerator[ExecutionResult, None]: - """Subscribe is not supported on HTTP. + """Execute a GraphQL subscription and yield results from multipart response. - :meta private: + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream """ - raise NotImplementedError(" The HTTP transport does not support subscriptions") + if self.session is None: + raise TransportClosed("Transport is not connected") + + post_args = self._prepare_request(request) + + # Add headers for multipart subscription + headers = post_args.get("headers", {}) + headers.update( + { + "Content-Type": "application/json", + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), + } + ) + post_args["headers"] = headers + + try: + async with self.session.post(self.url, ssl=self.ssl, **post_args) as resp: + # Saving latest response headers in the transport + self.response_headers = resp.headers + + # Check for errors + if resp.status >= 400: + # Raise a TransportServerError if status > 400 + self._raise_transport_server_error_if_status_more_than_400(resp) + + initial_content_type = resp.headers.get("Content-Type", "") + if ( + "application/json" in initial_content_type + and "multipart/mixed" not in initial_content_type + ): + yield await self._prepare_result(resp) + return + + if ( + ("multipart/mixed" not in initial_content_type) + or ("boundary=graphql" not in initial_content_type) + or ("subscriptionSpec=1.0" not in initial_content_type) + ): + raise TransportProtocolError( + f"Unexpected content-type: {initial_content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(resp): + yield result + + except TransportError: + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart response stream and yield execution results. + + Uses aiohttp's built-in MultipartReader to handle the multipart protocol. + + :param response: The aiohttp response object + :yields: ExecutionResult objects + """ + # Use aiohttp's built-in multipart reader + reader = MultipartReader.from_response(response) + + # Iterate through each part in the multipart response + while True: + try: + part = await reader.next() + except Exception: + # reader.next() throws on empty parts at the end of the stream. + # (some servers may send this.) + # see: https://github.com/aio-libs/aiohttp/pull/11857 + # As an ugly workaround for now, we can check if we've reached + # EOF and assume this was the case. + if reader.at_eof(): + break + + # Otherwise, re-raise unexpected errors + raise # pragma: no cover + + if part is None: + # No more parts + break + + assert not isinstance( + part, MultipartReader + ), "Nested multipart parts are not supported in GraphQL subscriptions" + + result = await self._parse_multipart_part(part) + if result: + yield result + + async def _parse_multipart_part( + self, part: BodyPartReader + ) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part: aiohttp BodyPartReader for the part + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Verify the part has the correct content type + content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "") + if not content_type.startswith("application/json"): + raise TransportProtocolError( + f"Unexpected part content-type: {content_type}. " + "Expected 'application/json'." + ) + + try: + # Read the part content as text + body = await part.text() + body = body.strip() + + if log.isEnabledFor(logging.DEBUG): + log.debug("<<< %s", ascii(body or "(empty body, skipping)")) + + if not body: + return None + + # Parse JSON body using custom deserializer + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data: + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" not in data: + log.warning("Invalid response: missing 'payload' field") + return None + + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + # If there are errors, this is a transport-level error + errors = data.get("errors") + if errors: + error_messages = [ + error.get("message", "Unknown transport error") + for error in errors + ] + + for message in error_messages: + log.error(f"Transport error: {message}") + + raise TransportServerError("\n\n".join(error_messages)) + else: + # Null payload without errors - just skip this part + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning( + f"Failed to parse JSON: {ascii(e)}, " + f"body: {ascii(body[:100]) if body else ''}" + ) + return None + except UnicodeDecodeError as e: + log.warning(f"Failed to decode part: {ascii(e)}") + return None diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py deleted file mode 100644 index 8ea31afa..00000000 --- a/gql/transport/http_multipart_transport.py +++ /dev/null @@ -1,327 +0,0 @@ -""" -HTTP Multipart Transport for GraphQL Subscriptions - -This transport implements support for GraphQL subscriptions over HTTP using -the multipart subscription protocol as implemented by Apollo GraphOS Router -and other compatible servers. - -Reference: -https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol -""" - -import asyncio -import json -import logging -from ssl import SSLContext -from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union - -import aiohttp -from aiohttp.client_reqrep import Fingerprint -from aiohttp.helpers import BasicAuth -from aiohttp.typedefs import LooseCookies, LooseHeaders -from graphql import ExecutionResult -from multidict import CIMultiDictProxy - -from gql.graphql_request import GraphQLRequest -from gql.transport.async_transport import AsyncTransport -from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event -from gql.transport.exceptions import ( - TransportAlreadyConnected, - TransportClosed, - TransportConnectionFailed, - TransportProtocolError, - TransportServerError, -) - -log = logging.getLogger(__name__) - - -class HTTPMultipartTransport(AsyncTransport): - """ - Async Transport for GraphQL subscriptions using the multipart subscription protocol. - - This transport sends GraphQL subscription queries via HTTP POST and receives - streaming multipart/mixed responses, where each part contains a JSON payload - with GraphQL execution results. This protocol is implemented by Apollo GraphOS - Router and other compatible servers. - """ - - def __init__( - self, - url: str, - headers: Optional[LooseHeaders] = None, - cookies: Optional[LooseCookies] = None, - auth: Optional[BasicAuth] = None, - ssl: Union[SSLContext, bool, Fingerprint] = True, - timeout: Optional[int] = None, - ssl_close_timeout: Optional[Union[int, float]] = 10, - json_serialize: Callable = json.dumps, - json_deserialize: Callable = json.loads, - client_session_args: Optional[Dict[str, Any]] = None, - ) -> None: - """ - Initialize the HTTP Multipart transport. - - :param url: The GraphQL server URL (http or https) - :param headers: Dict of HTTP Headers - :param cookies: Dict of HTTP cookies - :param auth: BasicAuth object for HTTP authentication - :param ssl: SSL context or validation mode - :param timeout: Request timeout in seconds - :param ssl_close_timeout: Timeout for SSL connection close - :param json_serialize: JSON serializer function - :param json_deserialize: JSON deserializer function - :param client_session_args: Extra args for aiohttp.ClientSession - """ - self.url = url - self.headers = headers or {} - self.cookies = cookies - self.auth = auth - self.ssl = ssl - self.timeout = timeout - self.ssl_close_timeout = ssl_close_timeout - self.json_serialize = json_serialize - self.json_deserialize = json_deserialize - self.client_session_args = client_session_args or {} - - self.session: Optional[aiohttp.ClientSession] = None - self.response_headers: Optional[CIMultiDictProxy[str]] = None - - async def connect(self) -> None: - """Create an aiohttp ClientSession.""" - if self.session is not None: - raise TransportAlreadyConnected("Transport is already connected") - - client_session_args: Dict[str, Any] = { - "cookies": self.cookies, - "headers": self.headers, - "auth": self.auth, - "json_serialize": self.json_serialize, - } - - if self.timeout is not None: - client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) - - client_session_args.update(self.client_session_args) - - log.debug("Connecting HTTP Multipart transport") - self.session = aiohttp.ClientSession(**client_session_args) - - async def close(self) -> None: - """Close the aiohttp session.""" - if self.session is not None: - log.debug("Closing HTTP Multipart transport") - - if ( - self.client_session_args - and self.client_session_args.get("connector_owner") is False - ): - log.debug("connector_owner is False -> not closing connector") - else: - closed_event = create_aiohttp_closed_event(self.session) - await self.session.close() - try: - await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) - except asyncio.TimeoutError: - pass - - self.session = None - - async def subscribe( - self, - request: GraphQLRequest, - ) -> AsyncGenerator[ExecutionResult, None]: - """ - Execute a GraphQL subscription and yield results from multipart response. - - :param request: GraphQL request to execute - :yields: ExecutionResult objects as they arrive in the multipart stream - """ - if self.session is None: - raise TransportClosed("Transport is not connected") - - payload = request.payload - if log.isEnabledFor(logging.DEBUG): - log.debug(">>> %s", self.json_serialize(payload)) - - headers = { - "Content-Type": "application/json", - "Accept": ( - "multipart/mixed;boundary=graphql;" - "subscriptionSpec=1.0,application/json" - ), - } - - try: - # Make the POST request - async with self.session.post( - self.url, - json=payload, - headers=headers, - ssl=self.ssl, - ) as response: - # Save response headers - self.response_headers = response.headers - - # Check for errors - if response.status >= 400: - error_text = await response.text() - raise TransportServerError( - f"Server returned {response.status}: {error_text}", - response.status, - ) - - initial_content_type = response.headers.get("Content-Type", "") - if ( - ("multipart/mixed" not in initial_content_type) - or ("boundary=graphql" not in initial_content_type) - or ("subscriptionSpec=1.0" not in initial_content_type) - or ("application/json" not in initial_content_type) - ): - raise TransportProtocolError( - f"Unexpected content-type: {initial_content_type}. " - "Server may not support the multipart subscription protocol." - ) - - # Parse multipart response - async for result in self._parse_multipart_response(response): - yield result - - except (TransportServerError, TransportProtocolError): - # Let these exceptions propagate without wrapping - raise - except Exception as e: - raise TransportConnectionFailed(str(e)) from e - - async def _parse_multipart_response( - self, - response: aiohttp.ClientResponse, - ) -> AsyncGenerator[ExecutionResult, None]: - """ - Parse a multipart response stream and yield execution results. - - Uses aiohttp's built-in MultipartReader to handle the multipart protocol. - - :param response: The aiohttp response object - :yields: ExecutionResult objects - """ - # Use aiohttp's built-in multipart reader - reader = aiohttp.MultipartReader.from_response(response) - - # Iterate through each part in the multipart response - while True: - try: - part = await reader.next() - except Exception: - # reader.next() throws on empty parts at the end of the stream. - # (some servers may send this.) - # see: https://github.com/aio-libs/aiohttp/pull/11857 - # As an ugly workaround for now, we can check if we've reached - # EOF and assume this was the case. - if reader.at_eof(): - break - - # Otherwise, re-raise unexpected errors - raise # pragma: no cover - - if part is None: - # No more parts - break - - assert not isinstance( - part, aiohttp.MultipartReader - ), "Nested multipart parts are not supported in GraphQL subscriptions" - - result = await self._parse_multipart_part(part) - if result: - yield result - - async def _parse_multipart_part( - self, part: aiohttp.BodyPartReader - ) -> Optional[ExecutionResult]: - """ - Parse a single part from a multipart response. - - :param part: aiohttp BodyPartReader for the part - :return: ExecutionResult or None if part is empty/heartbeat - """ - # Verify the part has the correct content type - content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "") - if not content_type.startswith("application/json"): - raise TransportProtocolError( - f"Unexpected part content-type: {content_type}. " - "Expected 'application/json'." - ) - - try: - # Read the part content as text - body = await part.text() - body = body.strip() - - if log.isEnabledFor(logging.DEBUG): - log.debug("<<< %s", ascii(body or "(empty body, skipping)")) - - if not body: - return None - - # Parse JSON body using custom deserializer - data = self.json_deserialize(body) - - # Handle heartbeats - empty JSON objects - if not data: - log.debug("Received heartbeat, ignoring") - return None - - # The multipart subscription protocol wraps data in a "payload" property - if "payload" not in data: - log.warning("Invalid response: missing 'payload' field") - return None - - payload = data["payload"] - - # Check for transport-level errors (payload is null) - if payload is None: - # If there are errors, this is a transport-level error - errors = data.get("errors") - if errors: - error_messages = [ - error.get("message", "Unknown transport error") - for error in errors - ] - - for message in error_messages: - log.error(f"Transport error: {message}") - - raise TransportServerError("\n\n".join(error_messages)) - else: - # Null payload without errors - just skip this part - return None - - # Extract GraphQL data from payload - return ExecutionResult( - data=payload.get("data"), - errors=payload.get("errors"), - extensions=payload.get("extensions"), - ) - except json.JSONDecodeError as e: - log.warning( - f"Failed to parse JSON: {ascii(e)}, " - f"body: {ascii(body[:100]) if body else ''}" - ) - return None - except UnicodeDecodeError as e: - log.warning(f"Failed to decode part: {ascii(e)}") - return None - - async def execute( - self, - request: GraphQLRequest, - ) -> ExecutionResult: - """ - :raises: NotImplementedError - This transport only supports subscriptions - """ - raise NotImplementedError( - "The HTTP multipart transport does not support queries or " - "mutations. Use HTTPTransport for queries and mutations, or use " - "subscribe() for subscriptions." - ) diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index 506b04f4..102fe3f2 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -353,32 +353,6 @@ async def handler(request): assert param["expected_exception"] in str(exc_info.value) -@pytest.mark.asyncio -async def test_aiohttp_subscribe_not_supported(aiohttp_server): - from aiohttp import web - - from gql.transport.aiohttp import AIOHTTPTransport - - async def handler(request): - return web.Response(text="does not matter", content_type="application/json") - - app = web.Application() - app.router.add_route("POST", "/", handler) - server = await aiohttp_server(app) - - url = server.make_url("/") - - transport = AIOHTTPTransport(url=url) - - async with Client(transport=transport) as session: - - query = gql(query1_str) - - with pytest.raises(NotImplementedError): - async for result in session.subscribe(query): - pass - - @pytest.mark.asyncio async def test_aiohttp_cannot_connect_twice(aiohttp_server): from aiohttp import web @@ -590,16 +564,17 @@ def test_code(): query = gql(query1_str) - # Note: subscriptions are not supported on the aiohttp transport - # But we add this test in order to have 100% code coverage # It is to check that we will correctly set an event loop # in the subscribe function if there is none (in a Thread for example) # We cannot test this with the websockets transport because # the websockets transport will set an event loop in its init - with pytest.raises(NotImplementedError): - for result in client.subscribe(query): - pass + results = [] + for result in client.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["continents"][0]["code"] == "AF" await run_sync_test(server, test_code) diff --git a/tests/test_http_multipart_transport.py b/tests/test_aiohttp_multipart.py similarity index 74% rename from tests/test_http_multipart_transport.py rename to tests/test_aiohttp_multipart.py index 75bc7e1b..eb7675d7 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_aiohttp_multipart.py @@ -97,8 +97,8 @@ async def handler(request): @pytest.mark.asyncio -async def test_http_multipart_subscription(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_subscription(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport def assert_response_headers(request): # Verify the Accept header follows the spec @@ -111,7 +111,7 @@ def assert_response_headers(request): parts = create_multipart_response([book1, book2]) server = await multipart_server(parts, request_handler=assert_response_headers) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) query = gql(subscription_str) @@ -127,13 +127,13 @@ def assert_response_headers(request): @pytest.mark.asyncio -async def test_http_multipart_subscription_with_heartbeat(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_subscription_with_heartbeat(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([book1, book2], include_heartbeat=True) server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) query = gql(subscription_str) @@ -150,10 +150,10 @@ async def test_http_multipart_subscription_with_heartbeat(multipart_server): @pytest.mark.aiohttp @pytest.mark.asyncio -async def test_http_multipart_unsupported_content_type(aiohttp_server): +async def test_aiohttp_multipart_unsupported_content_type(aiohttp_server): from aiohttp import web - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport async def handler(request): # Return text/html instead of application/json @@ -162,7 +162,7 @@ async def handler(request): app = web.Application() app.router.add_route("POST", "/", handler) server = await aiohttp_server(app) - transport = HTTPMultipartTransport(url=server.make_url("/")) + transport = AIOHTTPTransport(url=server.make_url("/")) query = gql(subscription_str) @@ -176,10 +176,10 @@ async def handler(request): @pytest.mark.aiohttp @pytest.mark.asyncio -async def test_http_multipart_server_error(aiohttp_server): +async def test_aiohttp_multipart_server_error(aiohttp_server): from aiohttp import web - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport async def handler(request): return web.Response(text="Internal Server Error", status=500) @@ -187,7 +187,7 @@ async def handler(request): app = web.Application() app.router.add_route("POST", "/", handler) server = await aiohttp_server(app) - transport = HTTPMultipartTransport(url=server.make_url("/")) + transport = AIOHTTPTransport(url=server.make_url("/")) query = gql(subscription_str) @@ -196,12 +196,12 @@ async def handler(request): async for result in session.subscribe(query): pass - assert "500: Internal Server Error" in str(exc_info.value) + assert "Internal Server Error" in str(exc_info.value) @pytest.mark.asyncio -async def test_http_multipart_transport_level_error(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_transport_level_error(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport # Transport error has null payload with errors at top level error_response = { @@ -220,7 +220,7 @@ async def test_http_multipart_transport_level_error(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -233,9 +233,9 @@ async def test_http_multipart_transport_level_error(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_graphql_errors(multipart_server): +async def test_aiohttp_multipart_graphql_errors(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport from gql.transport.exceptions import TransportQueryError - from gql.transport.http_multipart_transport import HTTPMultipartTransport # GraphQL errors come inside the payload response = { @@ -258,7 +258,7 @@ async def test_http_multipart_graphql_errors(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -277,31 +277,12 @@ async def test_http_multipart_graphql_errors(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_execute_method(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport - - parts = create_multipart_response([book1, book2]) - server = await multipart_server(parts) - url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) - - query = gql(subscription_str) - - async with Client(transport=transport) as session: - # execute() should raise NotImplementedError - with pytest.raises(NotImplementedError) as exc_info: - await session.execute(query) - - assert "does not support queries or mutations" in str(exc_info.value) - - -@pytest.mark.asyncio -async def test_http_multipart_transport_already_connected(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_transport_already_connected(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([]) server = await multipart_server(parts) - transport = HTTPMultipartTransport(url=server.make_url("/")) + transport = AIOHTTPTransport(url=server.make_url("/")) await transport.connect() @@ -312,12 +293,12 @@ async def test_http_multipart_transport_already_connected(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_transport_not_connected(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_transport_not_connected(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([book1]) server = await multipart_server(parts) - transport = HTTPMultipartTransport(url=server.make_url("/")) + transport = AIOHTTPTransport(url=server.make_url("/")) query = gql(subscription_str) request = GraphQLRequest(query) @@ -328,34 +309,15 @@ async def test_http_multipart_transport_not_connected(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_execute_empty_response(multipart_server): - from gql.transport.http_multipart_transport import HTTPMultipartTransport - - # Return empty multipart response (no data parts) - parts = ["--graphql--\r\n"] - server = await multipart_server(parts) - transport = HTTPMultipartTransport(url=server.make_url("/")) - - async with Client(transport=transport) as session: - query = gql(subscription_str) - - # execute() should raise NotImplementedError - with pytest.raises(NotImplementedError) as exc_info: - await session.execute(query) - - assert "does not support queries or mutations" in str(exc_info.value) - - -@pytest.mark.asyncio -async def test_http_multipart_newline_separator(multipart_server): +async def test_aiohttp_multipart_newline_separator(multipart_server): """Test that LF-only separators are rejected (spec requires CRLF).""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # The GraphQL over HTTP spec requires CRLF line endings in multipart responses # https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md parts = create_multipart_response([book1], separator="\n") server = await multipart_server(parts) - transport = HTTPMultipartTransport(url=server.make_url("/")) + transport = AIOHTTPTransport(url=server.make_url("/")) query = gql(subscription_str) @@ -367,11 +329,11 @@ async def test_http_multipart_newline_separator(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_transport_connection_failed_error(): - from gql.transport.http_multipart_transport import HTTPMultipartTransport +async def test_aiohttp_multipart_transport_connection_failed_error(): + from gql.transport.aiohttp import AIOHTTPTransport # Use an invalid URL that will fail to connect - transport = HTTPMultipartTransport(url="http://invalid.local:-1/graphql", timeout=1) + transport = AIOHTTPTransport(url="http://invalid.local:-1/graphql", timeout=1) query = gql(subscription_str) @@ -382,15 +344,15 @@ async def test_http_multipart_transport_connection_failed_error(): @pytest.mark.asyncio -async def test_http_multipart_connector_owner_false(multipart_server): +async def test_aiohttp_multipart_connector_owner_false(multipart_server): """Test closing transport with connector_owner=False.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([book1]) server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport( + transport = AIOHTTPTransport( url=url, client_session_args={"connector_owner": False} ) @@ -405,21 +367,21 @@ async def test_http_multipart_connector_owner_false(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_ssl_close_timeout(multipart_server): +async def test_aiohttp_multipart_ssl_close_timeout(multipart_server): """Test SSL close timeout during transport close.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([book1], separator="\n") server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url, ssl_close_timeout=0.001) + transport = AIOHTTPTransport(url=url, ssl_close_timeout=0.001) await transport.connect() # Mock the closed event to timeout with patch( - "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + "gql.transport.common.aiohttp_closed_event.create_aiohttp_closed_event" ) as mock_event: mock_wait = AsyncMock() mock_wait.side_effect = asyncio.TimeoutError() @@ -430,9 +392,9 @@ async def test_http_multipart_ssl_close_timeout(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_malformed_json(multipart_server): +async def test_aiohttp_multipart_malformed_json(multipart_server): """Test handling of malformed JSON in multipart response.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = [ ( @@ -446,7 +408,7 @@ async def test_http_multipart_malformed_json(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -460,9 +422,9 @@ async def test_http_multipart_malformed_json(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_payload_null_no_errors(multipart_server): +async def test_aiohttp_multipart_payload_null_no_errors(multipart_server): """Test handling of null payload without errors.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # Null payload but no errors response = {"payload": None} @@ -478,7 +440,7 @@ async def test_http_multipart_payload_null_no_errors(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -491,9 +453,9 @@ async def test_http_multipart_payload_null_no_errors(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_invalid_utf8(multipart_server): +async def test_aiohttp_multipart_invalid_utf8(multipart_server): """Test handling of invalid UTF-8 in multipart response.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = [ ( @@ -507,7 +469,7 @@ async def test_http_multipart_invalid_utf8(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -520,9 +482,9 @@ async def test_http_multipart_invalid_utf8(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_chunked_boundary_split(multipart_server): +async def test_aiohttp_multipart_chunked_boundary_split(multipart_server): """Test parsing when boundary is split across chunks.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = [ "--gra", @@ -535,7 +497,7 @@ async def test_http_multipart_chunked_boundary_split(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -548,9 +510,9 @@ async def test_http_multipart_chunked_boundary_split(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_wrong_part_content_type(multipart_server): +async def test_aiohttp_multipart_wrong_part_content_type(multipart_server): """Test that parts with wrong content-type raise an error.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # Part with text/html instead of application/json parts = [ @@ -560,7 +522,7 @@ async def test_http_multipart_wrong_part_content_type(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -574,14 +536,14 @@ async def test_http_multipart_wrong_part_content_type(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_response_headers(multipart_server): +async def test_aiohttp_multipart_response_headers(multipart_server): """Test that response headers are captured in the transport.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport parts = create_multipart_response([book1]) server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) query = gql(subscription_str) @@ -597,9 +559,9 @@ async def test_http_multipart_response_headers(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_empty_body(multipart_server): +async def test_aiohttp_multipart_empty_body(multipart_server): """Test part with empty body after stripping.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # Part with only whitespace body parts = [ @@ -609,7 +571,7 @@ async def test_http_multipart_empty_body(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -620,9 +582,9 @@ async def test_http_multipart_empty_body(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_missing_payload_field(multipart_server): +async def test_aiohttp_multipart_missing_payload_field(multipart_server): """Test handling of response missing required 'payload' field.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport response = {"foo": "bar"} # No payload field! parts = [ @@ -637,7 +599,7 @@ async def test_http_multipart_missing_payload_field(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -650,9 +612,9 @@ async def test_http_multipart_missing_payload_field(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_with_content_length_headers(multipart_server): +async def test_aiohttp_multipart_with_content_length_headers(multipart_server): """Test multipart response with Content-Length headers (like real servers send).""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # Simulate real server behavior: each part has Content-Length header book1_payload = json.dumps({"payload": {"data": {"book": book1}}}) @@ -687,7 +649,7 @@ async def test_http_multipart_with_content_length_headers(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) @@ -702,9 +664,9 @@ async def test_http_multipart_with_content_length_headers(multipart_server): @pytest.mark.asyncio -async def test_http_multipart_actually_invalid_utf8(multipart_server): +async def test_aiohttp_multipart_actually_invalid_utf8(multipart_server): """Test handling of ACTUAL invalid UTF-8 bytes in multipart response.""" - from gql.transport.http_multipart_transport import HTTPMultipartTransport + from gql.transport.aiohttp import AIOHTTPTransport # \\x80 is an invalid start byte in UTF-8 parts = [ @@ -719,7 +681,7 @@ async def test_http_multipart_actually_invalid_utf8(multipart_server): server = await multipart_server(parts) url = server.make_url("/") - transport = HTTPMultipartTransport(url=url) + transport = AIOHTTPTransport(url=url) async with Client(transport=transport) as session: query = gql(subscription_str) From 057ec8a8fdcb9d4d0d3c716a4423f748a74d7d91 Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Tue, 23 Dec 2025 13:36:50 +0100 Subject: [PATCH 4/7] Remove redundant tests --- tests/test_aiohttp_multipart.py | 87 ++++++--------------------------- 1 file changed, 16 insertions(+), 71 deletions(-) diff --git a/tests/test_aiohttp_multipart.py b/tests/test_aiohttp_multipart.py index eb7675d7..443e9b60 100644 --- a/tests/test_aiohttp_multipart.py +++ b/tests/test_aiohttp_multipart.py @@ -7,7 +7,6 @@ from gql import Client, gql from gql.graphql_request import GraphQLRequest from gql.transport.exceptions import ( - TransportAlreadyConnected, TransportClosed, TransportConnectionFailed, TransportProtocolError, @@ -199,6 +198,22 @@ async def handler(request): assert "Internal Server Error" in str(exc_info.value) +@pytest.mark.asyncio +async def test_aiohttp_multipart_transport_not_connected(multipart_server): + from gql.transport.aiohttp import AIOHTTPTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + transport = AIOHTTPTransport(url=server.make_url("/")) + + query = gql(subscription_str) + request = GraphQLRequest(query) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(request): + pass + + @pytest.mark.asyncio async def test_aiohttp_multipart_transport_level_error(multipart_server): from gql.transport.aiohttp import AIOHTTPTransport @@ -276,38 +291,6 @@ async def test_aiohttp_multipart_graphql_errors(multipart_server): assert exc_info.value.data["book"]["title"] == "Book 1" -@pytest.mark.asyncio -async def test_aiohttp_multipart_transport_already_connected(multipart_server): - from gql.transport.aiohttp import AIOHTTPTransport - - parts = create_multipart_response([]) - server = await multipart_server(parts) - transport = AIOHTTPTransport(url=server.make_url("/")) - - await transport.connect() - - with pytest.raises(TransportAlreadyConnected): - await transport.connect() - - await transport.close() - - -@pytest.mark.asyncio -async def test_aiohttp_multipart_transport_not_connected(multipart_server): - from gql.transport.aiohttp import AIOHTTPTransport - - parts = create_multipart_response([book1]) - server = await multipart_server(parts) - transport = AIOHTTPTransport(url=server.make_url("/")) - - query = gql(subscription_str) - request = GraphQLRequest(query) - - with pytest.raises(TransportClosed): - async for result in transport.subscribe(request): - pass - - @pytest.mark.asyncio async def test_aiohttp_multipart_newline_separator(multipart_server): """Test that LF-only separators are rejected (spec requires CRLF).""" @@ -328,44 +311,6 @@ async def test_aiohttp_multipart_newline_separator(multipart_server): pass -@pytest.mark.asyncio -async def test_aiohttp_multipart_transport_connection_failed_error(): - from gql.transport.aiohttp import AIOHTTPTransport - - # Use an invalid URL that will fail to connect - transport = AIOHTTPTransport(url="http://invalid.local:-1/graphql", timeout=1) - - query = gql(subscription_str) - - async with Client(transport=transport) as session: - with pytest.raises(TransportConnectionFailed): - async for result in session.subscribe(query): - pass - - -@pytest.mark.asyncio -async def test_aiohttp_multipart_connector_owner_false(multipart_server): - """Test closing transport with connector_owner=False.""" - from gql.transport.aiohttp import AIOHTTPTransport - - parts = create_multipart_response([book1]) - server = await multipart_server(parts) - url = server.make_url("/") - - transport = AIOHTTPTransport( - url=url, client_session_args={"connector_owner": False} - ) - - query = gql(subscription_str) - - async with Client(transport=transport) as session: - results = [] - async for result in session.subscribe(query): - results.append(result) - - assert len(results) == 1 - - @pytest.mark.asyncio async def test_aiohttp_multipart_ssl_close_timeout(multipart_server): """Test SSL close timeout during transport close.""" From 9daac5c95c2fd3524a310180118b3adb3dfba68e Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Tue, 23 Dec 2025 13:55:59 +0100 Subject: [PATCH 5/7] Adding new test to have 100% code coverage now that aiohttp transport supports subscriptions --- tests/test_httpx_async.py | 59 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/test_httpx_async.py b/tests/test_httpx_async.py index 690b3ee7..278d3c46 100644 --- a/tests/test_httpx_async.py +++ b/tests/test_httpx_async.py @@ -1447,3 +1447,62 @@ async def handler(request): pi = result["pi"] assert pi == Decimal("3.141592653589793238462643383279502884197") + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_httpx_subscribe_not_supported_cli(aiohttp_server): + """Test that the CLI falls back to execute when subscribe is not supported.""" + from aiohttp import web + + from gql.transport.httpx import HTTPXAsyncTransport + + async def handler(request): + return web.Response(text=query1_server_answer, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = str(server.make_url("/")) + + transport = HTTPXAsyncTransport(url=url) + + async with Client(transport=transport) as _: + + # Define arguments for the CLI + # We use the query "query getContinents..." + import io + import sys + from io import StringIO + + from gql import cli + + test_args = ["gql-cli", url, "--transport", "httpx"] + + # Mock sys.stdin to provide the query + sys.stdin = io.StringIO(query1_str) + + # Capture stdout + captured_output = StringIO() + original_stdout = sys.stdout + sys.stdout = captured_output + + try: + # We need to mock sys.argv as well because cli.get_parser() uses + # usage from sys.argv[0] sometimes, + # but mainly passing args to parse_args is cleaner. + parser = cli.get_parser() + parsed_args = parser.parse_args(test_args[1:]) # skip prog name + + exit_code = await cli.main(parsed_args) + assert exit_code == 0 + + except SystemExit: + pass + finally: + sys.stdout = original_stdout + sys.stdin = sys.__stdin__ # Restore stdin + + output = captured_output.getvalue() + assert "Africa" in output From 52fbedc05c3c123c991b5e58ed56bc7eab5b73cc Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Tue, 23 Dec 2025 15:01:43 +0100 Subject: [PATCH 6/7] Remove unneeded line in conftest.py --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9de910ed..cef561f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,7 +128,6 @@ async def ssl_aiohttp_server(): "gql.transport.appsync", "gql.transport.common.base", "gql.transport.httpx", - "gql.transport.http_multipart_transport", "gql.transport.phoenix_channel_websockets", "gql.transport.requests", "gql.transport.websockets", From b17670b0cfaf69b7c26c0d7c7f2a6d0a60702170 Mon Sep 17 00:00:00 2001 From: Leszek Hanusz Date: Tue, 23 Dec 2025 15:06:54 +0100 Subject: [PATCH 7/7] Adding multipart protocol to README.md --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 86f380f3..f5fddad4 100644 --- a/README.md +++ b/README.md @@ -30,13 +30,14 @@ The complete documentation for GQL can be found at ## Features -* Execute GraphQL queries using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html): +* Execute GraphQL requests using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html): * http + * including the multipart protocol for subscriptions * websockets: * apollo or graphql-ws protocol * Phoenix channels - * AWS AppSync realtime protocol (experimental) -* Possibility to [validate the queries locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query + * AWS AppSync realtime protocol +* Possibility to [validate the requests locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query * Supports GraphQL queries, mutations and [subscriptions](https://gql.readthedocs.io/en/latest/usage/subscriptions.html) * Supports [sync](https://gql.readthedocs.io/en/latest/usage/sync_usage.html) or [async](https://gql.readthedocs.io/en/latest/usage/async_usage.html) usage, [allowing concurrent requests](https://gql.readthedocs.io/en/latest/advanced/async_advanced_usage.html#async-advanced-usage) * Supports [File uploads](https://gql.readthedocs.io/en/latest/usage/file_upload.html)