diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index a676e3338..8576f50b9 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -683,10 +683,12 @@ impl WorkerRef { .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } - fn initiate_shutdown(&self) -> PyResult<()> { + fn initiate_shutdown<'p>(&self, py: Python<'p>) -> PyResult> { let worker = self.worker.as_ref().unwrap().clone(); - worker.initiate_shutdown(); - Ok(()) + self.runtime.future_into_py(py, async move { + worker.initiate_shutdown().await; + Ok(()) + }) } fn finalize_shutdown<'p>(&mut self, py: Python<'p>) -> PyResult> { diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index a9c857373..f928f0b47 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -265,9 +265,9 @@ def replace_client(self, client: temporalio.bridge.client.Client) -> None: """Replace the worker client.""" self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess] - def initiate_shutdown(self) -> None: + async def initiate_shutdown(self) -> None: """Start shutdown of the worker.""" - self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] + await self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] async def finalize_shutdown(self) -> None: """Finalize the worker. diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 508d5f708..1ba678640 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -390,7 +390,7 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]: # We must shutdown here try: if bridge_worker_scope is not None: - bridge_worker_scope.initiate_shutdown() + await bridge_worker_scope.initiate_shutdown() await bridge_worker_scope.finalize_shutdown() except Exception: logger.warning("Failed to finalize shutdown", exc_info=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 332e2ead7..5c8006c0c 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -837,7 +837,7 @@ async def raise_on_shutdown(): ) # Initiate core worker shutdown - self._bridge_worker.initiate_shutdown() + await self._bridge_worker.initiate_shutdown() # If any worker task had an exception, replace that task with a queue drain for worker, task in tasks.items(): diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index fb104b414..9eb4d82e3 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -622,7 +622,7 @@ async def _handle_cache_eviction( except Exception as e: self._throw_after_activation = e logger.debug("Shutting down worker on eviction hook exception") - self._bridge_worker().initiate_shutdown() + await self._bridge_worker().initiate_shutdown() def _create_workflow_instance( self, diff --git a/tests/__init__.py b/tests/__init__.py index 5af71def3..957ad45e5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +1 @@ -DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0" +DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6" diff --git a/tests/conftest.py b/tests/conftest.py index c813f91f9..92a1455db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]: "history.enableChasm=true", "--dynamic-config-value", "history.enableTransitionHistory=true", + "--dynamic-config-value", + "frontend.enableCancelWorkerPollsOnShutdown=true", ], dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION, )