diff --git a/src/py/kaleido/_sync_server.py b/src/py/kaleido/_sync_server.py index 740bd902..50eb804a 100644 --- a/src/py/kaleido/_sync_server.py +++ b/src/py/kaleido/_sync_server.py @@ -6,7 +6,8 @@ from functools import partial from queue import Queue from threading import Thread -from typing import TYPE_CHECKING, NamedTuple +from typing import TYPE_CHECKING, NamedTuple, Any +from concurrent.futures import Future from .kaleido import Kaleido @@ -18,6 +19,7 @@ class Task(NamedTuple): fn: str args: Any kwargs: Any + future:Future[Any] class _BadFunctionName(BaseException): @@ -37,11 +39,10 @@ async def _server(self, *args, **kwargs): if not hasattr(k, task.fn): raise _BadFunctionName(f"Kaleido has no attribute {task.fn}") try: - self._return_queue.put( - await getattr(k, task.fn)(*task.args, **task.kwargs), - ) + result = await getattr(k, task.fn)(*task.args, **task.kwargs) + task.future.set_result(result) except Exception as e: # noqa: BLE001 - self._return_queue.put(e) + task.future.set_exception(e) self._task_queue.task_done() @@ -72,7 +73,6 @@ def open(self, *args: Any, silence_warnings=False, **kwargs: Any) -> None: daemon=True, ) self._task_queue: Queue[Task | None] = Queue() - self._return_queue: Queue[Any] = Queue() self._thread.start() self._initialized = True close = partial(self.close, silence_warnings=True) @@ -92,7 +92,6 @@ def close(self, *, silence_warnings=False): self._thread.join() del self._thread del self._task_queue - del self._return_queue self._initialized = False def call_function(self, cmd: str, *args: Any, **kwargs: Any): @@ -117,13 +116,9 @@ def call_function(self, cmd: str, *args: Any, **kwargs: Any): UserWarning, stacklevel=3, ) - self._task_queue.put(Task(cmd, args, kwargs)) - self._task_queue.join() - res = self._return_queue.get() - if isinstance(res, BaseException): - raise res - else: - return res + future:Future[Any] = Future() + self._task_queue.put(Task(cmd, args, kwargs,future)) + return future.result() def oneshot_async_run(