Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ jobs:
- name: run SSE contract tests
run: make run-contract-tests

- name: start async SSE contract test service
run: make start-async-contract-test-service-bg

- name: run async SSE contract tests
run: make run-async-contract-tests

windows:
runs-on: windows-latest

Expand Down
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ poetry install
eval $(poetry env activate)
```

To also install the optional async dependencies (required to use `AsyncSSEClient`):

```
poetry install --extras async
```

### Testing

To run all unit tests:
Expand All @@ -36,6 +42,11 @@ To run the standardized contract tests that are run against all LaunchDarkly SSE
make contract-tests
```

To run the same contract tests against the async implementation:
```
make async-contract-tests
```

### Linting

To run the linter and check type hints:
Expand Down
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
PYTEST_FLAGS=-W error::SyntaxWarning

TEMP_TEST_OUTPUT=/tmp/sse-contract-test-service.log
TEMP_ASYNC_TEST_OUTPUT=/tmp/sse-async-contract-test-service.log

SPHINXOPTS = -W --keep-going
SPHINXBUILD = sphinx-build
Expand Down Expand Up @@ -70,3 +71,21 @@ run-contract-tests:
.PHONY: contract-tests
contract-tests: #! Run the SSE contract test harness
contract-tests: install-contract-tests-deps start-contract-test-service-bg run-contract-tests

.PHONY: start-async-contract-test-service
start-async-contract-test-service:
@cd contract-tests && poetry run python async_service.py 8001

.PHONY: start-async-contract-test-service-bg
start-async-contract-test-service-bg:
@echo "Async test service output will be captured in $(TEMP_ASYNC_TEST_OUTPUT)"
@make start-async-contract-test-service >$(TEMP_ASYNC_TEST_OUTPUT) 2>&1 &

.PHONY: run-async-contract-tests
run-async-contract-tests:
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \
| VERSION=v2 PARAMS="-url http://localhost:8001 -debug -stop-service-at-end" sh

.PHONY: async-contract-tests
async-contract-tests: #! Run the SSE async contract test harness
async-contract-tests: install-contract-tests-deps start-async-contract-test-service-bg run-async-contract-tests
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ This package's primary purpose is to support the [LaunchDarkly SDK for Python](h
* Setting read timeouts, custom headers, and other HTTP request properties.
* Specifying that connections should be retried under circumstances where the standard EventSource behavior would not retry them, such as if the server returns an HTTP error status.

This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.
The default `SSEClient` is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.

An async implementation, `AsyncSSEClient`, is also available for use with `asyncio`-based applications. It uses `aiohttp` for HTTP and requires installing the optional `async` extra:

```
pip install launchdarkly-eventsource[async]
```

## Supported Python versions

Expand Down
119 changes: 119 additions & 0 deletions contract-tests/async_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
import logging
import os
import sys
from logging.config import dictConfig

import aiohttp.web
from async_stream_entity import AsyncStreamEntity

default_port = 8000

dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] [%(name)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'default'
}
},
'root': {
'level': 'INFO',
'handlers': ['console']
},
})

global_log = logging.getLogger('testservice')

stream_counter = 0
streams = {}


async def handle_get_status(request):
body = {
'capabilities': [
'comments',
'headers',
'last-event-id',
'read-timeout',
]
}
return aiohttp.web.Response(
body=json.dumps(body),
content_type='application/json',
)


async def handle_delete_stop(request):
global_log.info("Test service has told us to exit")
os._exit(0)


async def handle_post_create_stream(request):
global stream_counter, streams

options = json.loads(await request.read())

stream_counter += 1
stream_id = str(stream_counter)
resource_url = '/streams/%s' % stream_id

stream = AsyncStreamEntity(options, request.app['http_session'])
streams[stream_id] = stream

return aiohttp.web.Response(status=201, headers={'Location': resource_url})


async def handle_post_stream_command(request):
stream_id = request.match_info['id']
params = json.loads(await request.read())

stream = streams.get(stream_id)
if stream is None:
return aiohttp.web.Response(status=404)
if not await stream.do_command(params.get('command')):
return aiohttp.web.Response(status=400)
return aiohttp.web.Response(status=204)


async def handle_delete_stream(request):
stream_id = request.match_info['id']

stream = streams.get(stream_id)
if stream is None:
return aiohttp.web.Response(status=404)
await stream.close()
return aiohttp.web.Response(status=204)


async def on_startup(app):
app['http_session'] = aiohttp.ClientSession()


async def on_cleanup(app):
await app['http_session'].close()


def make_app():
app = aiohttp.web.Application()
app.router.add_get('/', handle_get_status)
app.router.add_delete('/', handle_delete_stop)
app.router.add_post('/', handle_post_create_stream)
app.router.add_post('/streams/{id}', handle_post_stream_command)
app.router.add_delete('/streams/{id}', handle_delete_stream)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
return app


if __name__ == "__main__":
port = default_port
if sys.argv[len(sys.argv) - 1] != 'async_service.py':
port = int(sys.argv[len(sys.argv) - 1])
global_log.info('Listening on port %d', port)
aiohttp.web.run_app(make_app(), host='0.0.0.0', port=port)
116 changes: 116 additions & 0 deletions contract-tests/async_stream_entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import asyncio
import json
import logging
import os
import sys
import traceback

import aiohttp

# Import ld_eventsource from parent directory
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from ld_eventsource.actions import Comment, Event, Fault # noqa: E402
from ld_eventsource.async_client import AsyncSSEClient # noqa: E402
from ld_eventsource.config.async_connect_strategy import \
AsyncConnectStrategy # noqa: E402
from ld_eventsource.config.error_strategy import ErrorStrategy # noqa: E402


def millis_to_seconds(t):
return None if t is None else t / 1000


class AsyncStreamEntity:
def __init__(self, options, http_session: aiohttp.ClientSession):
self.options = options
self.callback_url = options["callbackUrl"]
self.log = logging.getLogger(options["tag"])
self.closed = False
self.callback_counter = 0
self.sse = None
self._http_session = http_session
asyncio.create_task(self.run())

async def run(self):
stream_url = self.options["streamUrl"]
try:
self.log.info('Opening stream from %s', stream_url)

request_options = {}
if self.options.get("readTimeoutMs") is not None:
request_options["timeout"] = aiohttp.ClientTimeout(
sock_read=millis_to_seconds(self.options.get("readTimeoutMs"))
)

connect = AsyncConnectStrategy.http(
url=stream_url,
headers=self.options.get("headers"),
aiohttp_request_options=request_options if request_options else None,
)
sse = AsyncSSEClient(
connect,
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
last_event_id=self.options.get("lastEventId"),
error_strategy=ErrorStrategy.from_lambda(
lambda _: (
ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE,
None,
)
),
logger=self.log,
)
self.sse = sse
async for item in sse.all:
if isinstance(item, Event):
self.log.info('Received event from stream (%s)', item.event)
await self.send_message(
{
'kind': 'event',
'event': {
'type': item.event,
'data': item.data,
'id': item.last_event_id,
},
}
)
elif isinstance(item, Comment):
self.log.info('Received comment from stream: %s', item.comment)
await self.send_message({'kind': 'comment', 'comment': item.comment})
elif isinstance(item, Fault):
if self.closed:
break
if item.error:
self.log.info('Received error from stream: %s', item.error)
await self.send_message({'kind': 'error', 'error': str(item.error)})
except Exception as e:
self.log.info('Received error from stream: %s', e)
self.log.info(traceback.format_exc())
await self.send_message({'kind': 'error', 'error': str(e)})

async def do_command(self, command: str) -> bool:
self.log.info('Test service sent command: %s' % command)
# currently we support no special commands
return False

async def send_message(self, message):
if self.closed:
return
self.callback_counter += 1
callback_url = "%s/%d" % (self.callback_url, self.callback_counter)
try:
async with self._http_session.post(
callback_url,
data=json.dumps(message),
headers={'Content-Type': 'application/json'},
) as resp:
if resp.status >= 300 and not self.closed:
self.log.error('Callback request returned HTTP error %d', resp.status)
except Exception as e:
if not self.closed:
self.log.error('Callback request failed: %s', e)

async def close(self):
self.closed = True
if self.sse is not None:
await self.sse.close()
self.log.info('Test ended')
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,6 @@
autodoc_default_options = {
'undoc-members': False
}

# aiohttp is an optional dependency not installed during doc builds
autodoc_mock_imports = ['aiohttp']
19 changes: 18 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ LaunchDarkly Python SSE Client

This is the API reference for the `launchdarkly-eventsource <https://github.com/launchdarkly/python-eventsource/>`_ package, a `Server-Sent Events <https://html.spec.whatwg.org/multipage/server-sent-events.html>`_ client for Python. This package is used internally by the `LaunchDarkly Python SDK <https://github.com/launchdarkly/python-server-sdk>`_, but may also be useful for other purposes.


ld_eventsource module
---------------------

Expand Down Expand Up @@ -37,3 +36,21 @@ ld_eventsource.errors module
:members:
:special-members: __init__
:show-inheritance:


ld_eventsource.async_client module
-----------------------------------

.. automodule:: ld_eventsource.async_client
:members:
:special-members: __init__
:show-inheritance:


ld_eventsource.config.async_connect_strategy module
----------------------------------------------------

.. automodule:: ld_eventsource.config.async_connect_strategy
:members:
:special-members: __init__
:show-inheritance:
9 changes: 9 additions & 0 deletions ld_eventsource/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
from ld_eventsource.sse_client import *


def __getattr__(name):
# Lazily import AsyncSSEClient so that aiohttp (an optional dependency)
# is never imported for sync-only users who don't have it installed.
if name == 'AsyncSSEClient':
from ld_eventsource.async_client import AsyncSSEClient
return AsyncSSEClient
raise AttributeError(f"module 'ld_eventsource' has no attribute {name!r}")
Loading
Loading