Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,19 @@ async def run_udf_app(
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
)

# Raise the keep-alive timeout so uvicorn does not close idle connections so
# eagerly. Whichever side closes first holds the socket in TIME_WAIT (~60s on
# Linux), so server-initiated closes churn sockets under load.
keep_alive_timeout = int(
os.environ.get('SINGLESTOREDB_UDF_KEEPALIVE_TIMEOUT', '120'),
)

config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_config=app.get_uvicorn_log_config(),
timeout_keep_alive=keep_alive_timeout,
)

# Register the functions only if the app is running interactively.
Expand Down
190 changes: 180 additions & 10 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""
import argparse
import asyncio
import atexit
import contextvars
import dataclasses
import datetime
Expand Down Expand Up @@ -113,6 +114,159 @@ async def to_thread(
return await loop.run_in_executor(None, func_call)


async def _poll_cancel(cancel_event: threading.Event) -> None:
"""
Return once ``cancel_event`` is set, polling it on the running loop.

``threading.Event`` has no awaitable interface, so this bridges the
cross-thread cancellation signal into the dispatch loop by polling on a
short interval. Used as a sibling task to the UDF coroutine in
``_cancellable_run``.
"""
while not cancel_event.is_set():
await asyncio.sleep(0.1)


async def _cancellable_run(
cancel_event: threading.Event,
coro: Any,
) -> Any:
"""
Run ``coro`` but abandon it if ``cancel_event`` is tripped.

The coroutine races ``_poll_cancel``; whichever finishes first wins. If
the cancel signal wins, the coroutine's task is cancelled and
``CancelledError`` is raised, otherwise its result (or exception) is
propagated. This is the authoritative cancellation path for async UDFs:
they run on the shared dispatch loop, where ordinary task cancellation
from the request loop does not reach them.
"""
task = asyncio.create_task(coro)
cancel_check = asyncio.create_task(_poll_cancel(cancel_event))
done, pending = await asyncio.wait(
[task, cancel_check], return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
if cancel_check in done:
task.cancel()
raise asyncio.CancelledError()
return task.result()
Comment thread
cursor[bot] marked this conversation as resolved.


# Dedicated event loop used for ALL async UDF requests.
#
# Async UDFs commonly create resources bound to the event loop they are
# first used on (httpx pools, async DB clients, anyio streams, ...). Routing
# every async UDF onto one dedicated loop lets those resources be reused
# safely across requests and avoids the "bound to a different event loop"
# errors seen when requests land on different ad-hoc worker threads.
# ``run_coroutine_threadsafe`` schedules each coroutine immediately, so
# requests run concurrently rather than queuing behind in-flight ones.
#
# Sync UDFs instead run in a worker thread (one ``asyncio.run`` per call):
# a sync UDF would block this shared loop and starve other async requests.
_async_dispatch_loop: 'Optional[asyncio.AbstractEventLoop]' = None
_async_dispatch_thread: 'Optional[threading.Thread]' = None
_async_dispatch_lock = threading.Lock()


def _get_async_dispatch_loop() -> asyncio.AbstractEventLoop:
"""
Return (lazily creating) the singleton async-dispatch event loop.

Owned by a dedicated daemon thread running ``run_forever`` for the life
of the process (see the module-level notes above for the rationale).
"""
global _async_dispatch_loop, _async_dispatch_thread

loop = _async_dispatch_loop
if loop is not None and not loop.is_closed():
return loop

with _async_dispatch_lock:
if _async_dispatch_loop is not None and \
not _async_dispatch_loop.is_closed():
return _async_dispatch_loop

ready = threading.Event()
captured: List[asyncio.AbstractEventLoop] = []

def run_loop() -> None:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
captured.append(new_loop)
ready.set()
try:
new_loop.run_forever()
finally:
try:
new_loop.run_until_complete(new_loop.shutdown_asyncgens())
except Exception:
pass
try:
new_loop.run_until_complete(
new_loop.shutdown_default_executor(),
)
except Exception:
pass
try:
new_loop.close()
except Exception:
pass

thread = threading.Thread(
target=run_loop,
name='singlestoredb-udf-async-dispatch',
daemon=True,
)
thread.start()
ready.wait()

_async_dispatch_loop = captured[0]
_async_dispatch_thread = thread
return _async_dispatch_loop


async def _dispatch_to_async_loop(coro: Any) -> Any:
"""
Schedule ``coro`` on the dedicated async-dispatch loop and await it.

Cancelling the awaiting task best-effort cancels the scheduled work, but
``cancel_event`` (observed by ``_cancellable_run`` from inside the
dispatch loop) remains the authoritative cancellation signal.
"""
loop = _get_async_dispatch_loop()
cf = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return await asyncio.wrap_future(cf)
except asyncio.CancelledError:
cf.cancel()
raise


def _shutdown_async_dispatch_loop() -> None:
"""Best-effort cleanup of the dedicated async-dispatch loop at exit."""
global _async_dispatch_loop, _async_dispatch_thread
with _async_dispatch_lock:
loop = _async_dispatch_loop
thread = _async_dispatch_thread
_async_dispatch_loop = None
_async_dispatch_thread = None

if loop is not None and not loop.is_closed():
try:
loop.call_soon_threadsafe(loop.stop)
except Exception:
pass

if thread is not None:
thread.join(timeout=5)


atexit.register(_shutdown_async_dispatch_loop)


# Use negative values to indicate unsigned ints / binary data / usec time precision
rowdat_1_type_map = {
'bool': ft.LONGLONG,
Expand Down Expand Up @@ -1195,15 +1349,26 @@ async def __call__(
func_info['colspec'], b''.join(data),
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
# Async UDFs run on the dedicated dispatch loop; sync UDFs run
# in a worker thread (one asyncio.run per call) so they cannot
# block that shared loop (see the module-level notes above).
if func_info.get('is_async'):
func_task = asyncio.create_task(
_dispatch_to_async_loop(
_cancellable_run(
cancel_event,
func(cancel_event, call_timer, *inputs),
),
),
),
)
)
else:
func_task = asyncio.create_task(
to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
),
),
)
disconnect_task = asyncio.create_task(
asyncio.sleep(int(1e9))
if ignore_cancel else cancel_on_disconnect(receive),
Expand All @@ -1219,17 +1384,21 @@ async def __call__(
all_tasks, return_when=asyncio.FIRST_COMPLETED,
)

# Signal cancellation before awaiting: cancelling func_task
# only unwinds its asyncio wrapper on this loop, not the work
# running off-thread; cancel_event is what actually reaches it.
if func_task in pending:
cancel_event.set()
Comment thread
cursor[bot] marked this conversation as resolved.

await cancel_all_tasks(pending)

for task in done:
if task is disconnect_task:
cancel_event.set()
raise asyncio.CancelledError(
'Function call was cancelled by client disconnect',
)

elif task is timeout_task:
cancel_event.set()
raise asyncio.TimeoutError(
'Function call was cancelled due to timeout',
)
Expand Down Expand Up @@ -1292,6 +1461,7 @@ async def __call__(
await send(self.error_response_dict)

finally:
cancel_event.set()
await cancel_all_tasks(all_tasks)

# Handle api reflection
Expand Down
Loading
Loading