From 8c11e18a5a7d13239ae7d044f0a324bf23240521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Yi=C4=9Fit=20Budak?= Date: Fri, 12 Jun 2026 09:56:23 +0300 Subject: [PATCH] Make sync wrapper fork-safe --- postmark/sync.py | 28 +++++++++++++++++++++++++--- tests/test_sync_client.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/postmark/sync.py b/postmark/sync.py index 7412951..412ee1a 100644 --- a/postmark/sync.py +++ b/postmark/sync.py @@ -21,6 +21,7 @@ import asyncio import inspect +import os import threading from typing import Optional @@ -32,12 +33,33 @@ class _EventLoopThread: """Persistent background thread running a dedicated event loop.""" def __init__(self): - self._loop = asyncio.new_event_loop() - t = threading.Thread(target=self._loop.run_forever, daemon=True) - t.start() + self._lock = threading.Lock() + self._loop = None + self._thread = None + self._pid = None + + def _ensure_started(self): + current_pid = os.getpid() + with self._lock: + if ( + self._pid == current_pid + and self._loop is not None + and not self._loop.is_closed() + and self._thread is not None + and self._thread.is_alive() + ): + return + + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread( + target=self._loop.run_forever, daemon=True + ) + self._thread.start() + self._pid = current_pid def run(self, coro): """Submit a coroutine and block the calling thread until it returns or raises.""" + self._ensure_started() return asyncio.run_coroutine_threadsafe(coro, self._loop).result() diff --git a/tests/test_sync_client.py b/tests/test_sync_client.py index 171e80b..145d936 100644 --- a/tests/test_sync_client.py +++ b/tests/test_sync_client.py @@ -1,5 +1,8 @@ """Tests for postmark.sync — synchronous wrapper around the async clients.""" +import os +import select +import signal from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -262,3 +265,39 @@ async def echo(x): result = postmark.sync._loop.run(echo("hello")) assert result == "hello" + + @pytest.mark.skipif(not hasattr(os, "fork"), reason="requires os.fork") + def test_module_loop_recovers_after_fork(self): + async def echo(value): + return value + + assert postmark.sync._loop.run(echo("parent")) == "parent" + + read_fd, write_fd = os.pipe() + pid = os.fork() + if pid == 0: + os.close(read_fd) + try: + result = postmark.sync._loop.run(echo("child")) + os.write(write_fd, result.encode()) + except Exception as exc: + os.write(write_fd, f"ERROR:{exc}".encode()) + finally: + os.close(write_fd) + os._exit(0) + + os.close(write_fd) + try: + ready, _, _ = select.select([read_fd], [], [], 2) + if not ready: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + pytest.fail("module event loop hung after fork") + + payload = os.read(read_fd, 1024).decode() + _, status = os.waitpid(pid, 0) + finally: + os.close(read_fd) + + assert status == 0 + assert payload == "child"