Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ The complete documentation for GQL can be found at

## Features

* Execute GraphQL queries using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
* Execute GraphQL requests using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
* http
* including the multipart protocol for subscriptions
* websockets:
* apollo or graphql-ws protocol
* Phoenix channels
* AWS AppSync realtime protocol (experimental)
* Possibility to [validate the queries locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
* AWS AppSync realtime protocol
* Possibility to [validate the requests locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
* Supports GraphQL queries, mutations and [subscriptions](https://gql.readthedocs.io/en/latest/usage/subscriptions.html)
* Supports [sync](https://gql.readthedocs.io/en/latest/usage/sync_usage.html) or [async](https://gql.readthedocs.io/en/latest/usage/async_usage.html) usage, [allowing concurrent requests](https://gql.readthedocs.io/en/latest/advanced/async_advanced_usage.html#async-advanced-usage)
* Supports [File uploads](https://gql.readthedocs.io/en/latest/usage/file_upload.html)
Expand Down
37 changes: 37 additions & 0 deletions docs/code_examples/aiohttp_multipart_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import logging

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.INFO)


async def main():

transport = AIOHTTPTransport(url="https://gql-book-server.fly.dev/graphql")

# Using `async with` on the client will start a connection on the transport
# and provide a `session` variable to execute queries on this connection
async with Client(
transport=transport,
) as session:

# Request subscription
subscription = gql(
"""
subscription {
book {
title
author
}
}
"""
)

# Subscribe and receive streaming updates
async for result in session.subscribe(subscription):
print(f"Received: {result}")


asyncio.run(main())
76 changes: 71 additions & 5 deletions docs/transports/aiohttp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,79 @@ This transport uses the `aiohttp`_ library and allows you to send GraphQL querie

Reference: :class:`gql.transport.aiohttp.AIOHTTPTransport`

.. note::
This transport supports both standard GraphQL operations (queries, mutations) and subscriptions.
Subscriptions are implemented using the `multipart subscription protocol`_
as implemented by Apollo GraphOS Router and other compatible servers.

GraphQL subscriptions are not supported on the HTTP transport.
For subscriptions you should use a websockets transport:
:ref:`WebsocketsTransport <websockets_transport>` or
:ref:`AIOHTTPWebsocketsTransport <aiohttp_websockets_transport>`.
This provides an HTTP-based alternative to WebSocket transports for receiving streaming
subscription updates. It's particularly useful when:

- WebSocket connections are not available or blocked by infrastructure
- You want to use standard HTTP with existing load balancers and proxies
- The backend implements the multipart subscription protocol

Queries
-------

.. literalinclude:: ../code_examples/aiohttp_async.py

Subscriptions
-------------

The transport sends a standard HTTP POST request with an ``Accept`` header indicating
support for multipart responses:

.. code-block:: text

Accept: multipart/mixed;subscriptionSpec="1.0", application/json

The server responds with a ``multipart/mixed`` content type and streams subscription
updates as separate parts in the response body. Each part contains a JSON payload
with GraphQL execution results.

.. literalinclude:: ../code_examples/aiohttp_multipart_subscription.py

How It Works
^^^^^^^^^^^^

**Message Format**

Each message part follows this structure:

.. code-block:: text

--graphql
Content-Type: application/json

{"payload": {"data": {...}, "errors": [...]}}

**Heartbeats**

Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the
connection alive. These are automatically filtered out by the transport.

**Error Handling**

The protocol distinguishes between two types of errors:

- **GraphQL errors**: Returned within the ``payload`` property alongside data
- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload

**End of Stream**

The subscription ends when the server sends the final boundary marker:

.. code-block:: text

--graphql--

Limitations
^^^^^^^^^^^

- Subscriptions require the server to implement the multipart subscription protocol
- Long-lived connections may be terminated by intermediate proxies or load balancers
- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming

Authentication
--------------

Expand Down Expand Up @@ -52,3 +116,5 @@ and you can save these cookies in a cookie jar to reuse them in a following conn

.. _aiohttp: https://docs.aiohttp.org
.. _issue 197: https://github.com/graphql-python/gql/issues/197
.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol

183 changes: 179 additions & 4 deletions gql/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

import aiohttp
from aiohttp import BodyPartReader, MultipartReader
from aiohttp.client_exceptions import ClientResponseError
from aiohttp.client_reqrep import Fingerprint
from aiohttp.helpers import BasicAuth
Expand Down Expand Up @@ -421,12 +422,186 @@ async def execute_batch(
except Exception as e:
raise TransportConnectionFailed(str(e)) from e

def subscribe(
async def subscribe(
self,
request: GraphQLRequest,
) -> AsyncGenerator[ExecutionResult, None]:
"""Subscribe is not supported on HTTP.
"""Execute a GraphQL subscription and yield results from multipart response.

:meta private:
:param request: GraphQL request to execute
:yields: ExecutionResult objects as they arrive in the multipart stream
"""
raise NotImplementedError(" The HTTP transport does not support subscriptions")
if self.session is None:
raise TransportClosed("Transport is not connected")

post_args = self._prepare_request(request)

# Add headers for multipart subscription
headers = post_args.get("headers", {})
headers.update(
{
"Content-Type": "application/json",
"Accept": (
"multipart/mixed;boundary=graphql;"
"subscriptionSpec=1.0,application/json"
),
}
)
post_args["headers"] = headers

try:
async with self.session.post(self.url, ssl=self.ssl, **post_args) as resp:
# Saving latest response headers in the transport
self.response_headers = resp.headers

# Check for errors
if resp.status >= 400:
# Raise a TransportServerError if status > 400
self._raise_transport_server_error_if_status_more_than_400(resp)

initial_content_type = resp.headers.get("Content-Type", "")
if (
"application/json" in initial_content_type
and "multipart/mixed" not in initial_content_type
):
yield await self._prepare_result(resp)
return

if (
("multipart/mixed" not in initial_content_type)
or ("boundary=graphql" not in initial_content_type)
or ("subscriptionSpec=1.0" not in initial_content_type)
):
raise TransportProtocolError(
f"Unexpected content-type: {initial_content_type}. "
"Server may not support the multipart subscription protocol."
)

# Parse multipart response
async for result in self._parse_multipart_response(resp):
yield result

except TransportError:
raise
except Exception as e:
raise TransportConnectionFailed(str(e)) from e

async def _parse_multipart_response(
self,
response: aiohttp.ClientResponse,
) -> AsyncGenerator[ExecutionResult, None]:
"""
Parse a multipart response stream and yield execution results.

Uses aiohttp's built-in MultipartReader to handle the multipart protocol.

:param response: The aiohttp response object
:yields: ExecutionResult objects
"""
# Use aiohttp's built-in multipart reader
reader = MultipartReader.from_response(response)

# Iterate through each part in the multipart response
while True:
try:
part = await reader.next()
except Exception:
# reader.next() throws on empty parts at the end of the stream.
# (some servers may send this.)
# see: https://github.com/aio-libs/aiohttp/pull/11857
# As an ugly workaround for now, we can check if we've reached
# EOF and assume this was the case.
if reader.at_eof():
break

# Otherwise, re-raise unexpected errors
raise # pragma: no cover

if part is None:
# No more parts
break

assert not isinstance(
part, MultipartReader
), "Nested multipart parts are not supported in GraphQL subscriptions"

result = await self._parse_multipart_part(part)
if result:
yield result

async def _parse_multipart_part(
self, part: BodyPartReader
) -> Optional[ExecutionResult]:
"""
Parse a single part from a multipart response.

:param part: aiohttp BodyPartReader for the part
:return: ExecutionResult or None if part is empty/heartbeat
"""
# Verify the part has the correct content type
content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "")
if not content_type.startswith("application/json"):
raise TransportProtocolError(
f"Unexpected part content-type: {content_type}. "
"Expected 'application/json'."
)

try:
# Read the part content as text
body = await part.text()
body = body.strip()

if log.isEnabledFor(logging.DEBUG):
log.debug("<<< %s", ascii(body or "(empty body, skipping)"))

if not body:
return None

# Parse JSON body using custom deserializer
data = self.json_deserialize(body)

# Handle heartbeats - empty JSON objects
if not data:
log.debug("Received heartbeat, ignoring")
return None

# The multipart subscription protocol wraps data in a "payload" property
if "payload" not in data:
log.warning("Invalid response: missing 'payload' field")
return None

payload = data["payload"]

# Check for transport-level errors (payload is null)
if payload is None:
# If there are errors, this is a transport-level error
errors = data.get("errors")
if errors:
error_messages = [
error.get("message", "Unknown transport error")
for error in errors
]

for message in error_messages:
log.error(f"Transport error: {message}")

raise TransportServerError("\n\n".join(error_messages))
else:
# Null payload without errors - just skip this part
return None

# Extract GraphQL data from payload
return ExecutionResult(
data=payload.get("data"),
errors=payload.get("errors"),
extensions=payload.get("extensions"),
)
except json.JSONDecodeError as e:
log.warning(
f"Failed to parse JSON: {ascii(e)}, "
f"body: {ascii(body[:100]) if body else ''}"
)
return None
except UnicodeDecodeError as e:
log.warning(f"Failed to decode part: {ascii(e)}")
return None
Loading
Loading