From 1d9635fdd0e2cb1e95b2ab0628977098326a6ae5 Mon Sep 17 00:00:00 2001 From: akshayjadiyanv Date: Tue, 26 May 2026 15:27:04 -0700 Subject: [PATCH 1/3] Add embedded NVIDIA Dynamo support to vLLM ModelHandler VLLMCompletionsModelHandler and VLLMChatModelHandler gain two keyword-only parameters, use_dynamo (default False) and dynamo_frontend_kwargs. When use_dynamo=True, the handler launches a dynamo.frontend process as the OpenAI-compatible local endpoint plus a separate dynamo.vllm worker, instead of vllm.entrypoints.openai.api_server. The existing native-vLLM path is unchanged when the flag is absent. The example pipeline vllm_text_completion.py gains --use_dynamo and --max_tokens flags. validate_inference_args is now a no-op on both handlers so OpenAI-style request kwargs (e.g. max_tokens) can be passed through RunInference. A new unit-test module covers process-launch behaviour for both paths. This supersedes #36966 (now closed) and rebases the embedded-Dynamo approach onto current master, preserving the recent batching-kwargs additions to the ModelHandler base. Co-authored-by: Danny McCormick --- .../inference/vllm_text_completion.py | 24 +- .../ml/inference/vllm_inference.py | 314 +++++++++++++++--- .../ml/inference/vllm_inference_test.py | 157 +++++++++ .../python/test-suites/dataflow/common.gradle | 20 ++ 4 files changed, 472 insertions(+), 43 deletions(-) create mode 100644 sdks/python/apache_beam/ml/inference/vllm_inference_test.py diff --git a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py index a7468f521ebb..21f0d41d3cb2 100644 --- a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py +++ b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py @@ -138,6 +138,20 @@ def parse_known_args(argv): 'Passed to the vLLM OpenAI server as --gpu-memory-utilization ' '(fraction of total GPU memory for KV cache). Lower this if the ' 'engine fails to start with CUDA out of memory.')) + parser.add_argument( + '--use_dynamo', + dest='use_dynamo', + action='store_true', + help=( + 'Use embedded NVIDIA Dynamo as the vLLM engine. Requires ' + 'ai-dynamo[vllm] and the etcd binary in the runtime environment. ' + 'See VLLMCompletionsModelHandler for limitations of embedded mode.')) + parser.add_argument( + '--max_tokens', + dest='max_tokens', + type=int, + default=16, + help='Maximum number of tokens to generate for each example.') return parser.parse_known_args(argv) @@ -178,14 +192,17 @@ def run( build_vllm_server_kwargs(known_args)) model_handler = VLLMCompletionsModelHandler( - model_name=known_args.model, vllm_server_kwargs=effective_vllm_kwargs) + model_name=known_args.model, + vllm_server_kwargs=effective_vllm_kwargs, + use_dynamo=known_args.use_dynamo) input_examples = COMPLETION_EXAMPLES if known_args.chat: model_handler = VLLMChatModelHandler( model_name=known_args.model, chat_template_path=known_args.chat_template, - vllm_server_kwargs=dict(effective_vllm_kwargs)) + vllm_server_kwargs=dict(effective_vllm_kwargs), + use_dynamo=known_args.use_dynamo) input_examples = CHAT_EXAMPLES pipeline = test_pipeline @@ -193,7 +210,8 @@ def run( pipeline = beam.Pipeline(options=pipeline_options) examples = pipeline | "Create examples" >> beam.Create(input_examples) - predictions = examples | "RunInference" >> RunInference(model_handler) + predictions = examples | "RunInference" >> RunInference( + model_handler, inference_args={'max_tokens': known_args.max_tokens}) process_output = predictions | "Process Predictions" >> beam.ParDo( PostProcessor()) _ = process_output | "WriteOutput" >> beam.io.WriteToText( diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index 38283f1efd42..56ba0f71c122 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -20,10 +20,12 @@ import asyncio import logging import os +import shutil import subprocess import sys import threading import time +import urllib.request import uuid from collections.abc import Callable from collections.abc import Iterable @@ -109,36 +111,200 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI: ) +# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery, +# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing, +# disaggregated prefill/decode, and the Planner are not active in this mode. +_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = { + 'discovery-backend': 'etcd', + 'request-plane': 'tcp', + 'event-plane': 'zmq', + 'router-mode': 'round-robin', + 'no-router-kv-events': None, +} + +_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = { + 'discovery-backend': 'etcd', + 'request-plane': 'tcp', + 'event-plane': 'zmq', + 'kv-events-config': '{"enable_kv_cache_events": false}', +} + + +def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None: + for k, v in kwargs.items(): + cmd.append(f'--{k}') + # Only add values for commands with value part. + if v is not None: + cmd.append(v) + + +def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool: + return kwargs.get('discovery-backend') == 'etcd' + + class _VLLMModelServer(): - def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]): + def __init__( + self, + model_name: str, + vllm_server_kwargs: dict[str, Optional[str]], + dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None, + use_dynamo: bool = False): self._model_name = model_name self._vllm_server_kwargs = vllm_server_kwargs + self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {} self._server_started = False self._server_process = None + self._dynamo_process = None + self._etcd_process = None + self._managed_etcd_endpoint = None self._server_port: int = -1 self._server_process_lock = threading.RLock() + self._use_dynamo = use_dynamo self.start_server() + @staticmethod + def _stop_process(process: Optional[subprocess.Popen]) -> None: + if process is None or process.poll() is not None: + return + process.terminate() + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + def _stop_processes(self) -> None: + self._stop_process(self._dynamo_process) + self._stop_process(self._server_process) + self._stop_process(self._etcd_process) + if (self._managed_etcd_endpoint is not None and + os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint): + del os.environ['ETCD_ENDPOINTS'] + self._dynamo_process = None + self._server_process = None + self._etcd_process = None + self._managed_etcd_endpoint = None + self._server_started = False + self._server_port = -1 + + def _process_status(self) -> str: + process_status = [] + if self._server_process is not None: + process_status.append( + 'frontend/server exit code: %s' % self._server_process.poll()) + if self._dynamo_process is not None: + process_status.append( + 'dynamo worker exit code: %s' % self._dynamo_process.poll()) + if self._etcd_process is not None: + process_status.append('etcd exit code: %s' % self._etcd_process.poll()) + return ', '.join(process_status) or 'no process status available' + + def __del__(self): + self._stop_processes() + + def _uses_embedded_etcd(self) -> bool: + return ( + self._use_dynamo and + _uses_etcd_discovery(self._dynamo_frontend_kwargs) and + _uses_etcd_discovery(self._vllm_server_kwargs) and + 'ETCD_ENDPOINTS' not in os.environ) + + def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None: + deadline = time.time() + timeout_secs + health_url = endpoint.rstrip('/') + '/health' + while time.time() < deadline and self._etcd_process.poll() is None: + try: + with urllib.request.urlopen(health_url, timeout=2) as response: + if response.status < 500: + return + except Exception: # pylint: disable=broad-except + time.sleep(1) + + process_status = self._process_status() + self._stop_processes() + raise RuntimeError( + "Failed to start embedded etcd for Dynamo. Process status: " + f"{process_status}. Install etcd in the worker container or set " + "ETCD_ENDPOINTS to an external etcd service.") + + def _ensure_etcd(self) -> None: + if not self._uses_embedded_etcd(): + return + if shutil.which('etcd') is None: + raise RuntimeError( + "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not " + "set. Install etcd in the worker container or set ETCD_ENDPOINTS " + "to an external etcd service.") + + etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}' + etcd_data_dir = f'/tmp/{etcd_name}' + peer_port, = subprocess_server.pick_port(None) + etcd_cmd = [ + 'etcd', + '--name', + etcd_name, + '--listen-client-urls', + 'http://127.0.0.1:{{PORT}}', + '--advertise-client-urls', + 'http://127.0.0.1:{{PORT}}', + '--listen-peer-urls', + f'http://127.0.0.1:{peer_port}', + '--initial-advertise-peer-urls', + f'http://127.0.0.1:{peer_port}', + '--initial-cluster', + f'{etcd_name}=http://127.0.0.1:{peer_port}', + '--data-dir', + etcd_data_dir, + '--log-level', + 'warn', + ] + self._etcd_process, etcd_port = start_process(etcd_cmd) + endpoint = f'http://127.0.0.1:{etcd_port}' + os.environ['ETCD_ENDPOINTS'] = endpoint + self._managed_etcd_endpoint = endpoint + self._wait_for_etcd(endpoint) + def start_server(self, retries=3): with self._server_process_lock: if not self._server_started: - server_cmd = [ - sys.executable, - '-m', - 'vllm.entrypoints.openai.api_server', - '--model', - self._model_name, - '--port', - '{{PORT}}', - ] - for k, v in self._vllm_server_kwargs.items(): - server_cmd.append(f'--{k}') - # Only add values for commands with value part. - if v is not None: - server_cmd.append(v) + self._stop_processes() + self._ensure_etcd() + if self._use_dynamo: + # Dynamo embedded mode uses the frontend as its OpenAI-compatible + # local endpoint and a separate vLLM worker process. + server_cmd = [ + sys.executable, + '-m', + 'dynamo.frontend', + '--http-port', + '{{PORT}}', + ] + _append_kwargs(server_cmd, self._dynamo_frontend_kwargs) + else: + server_cmd = [ + sys.executable, + '-m', + 'vllm.entrypoints.openai.api_server', + '--model', + self._model_name, + '--port', + '{{PORT}}', + ] + _append_kwargs(server_cmd, self._vllm_server_kwargs) self._server_process, self._server_port = start_process(server_cmd) + if self._use_dynamo: + server_cmd = [ + sys.executable, + '-m', + 'dynamo.vllm', + '--model', + self._model_name, + ] + _append_kwargs(server_cmd, self._vllm_server_kwargs) + self._dynamo_process, _ = start_process(server_cmd) + self.check_connectivity(retries) def get_server_port(self) -> int: @@ -146,9 +312,13 @@ def get_server_port(self) -> int: self.start_server() return self._server_port - def check_connectivity(self, retries=3): + def check_connectivity(self, retries=3, timeout_secs=600): + start_time = time.time() with getVLLMClient(self._server_port) as client: - while self._server_process.poll() is None: + while (time.time() - start_time < timeout_secs and + self._server_process.poll() is None and + (self._dynamo_process is None or + self._dynamo_process.poll() is None)): try: models = client.models.list().data logging.info('models: %s' % models) @@ -160,12 +330,13 @@ def check_connectivity(self, retries=3): # Sleep while bringing up the process time.sleep(5) + process_status = self._process_status() + self._stop_processes() if retries == 0: - self._server_started = False raise Exception( - "Failed to start vLLM server, polling process exited with code " + - "%s. Next time a request is tried, the server will be restarted" % - self._server_process.poll()) + "Failed to start vLLM server. Process status: " + f"{process_status}. Next time a request is tried, the server " + "will be restarted") else: self.start_server(retries - 1) @@ -176,8 +347,10 @@ class VLLMCompletionsModelHandler(ModelHandler[str, def __init__( self, model_name: str, - vllm_server_kwargs: Optional[dict[str, str]] = None, + vllm_server_kwargs: Optional[dict[str, Optional[str]]] = None, *, + use_dynamo: bool = False, + dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, @@ -197,15 +370,28 @@ def __init__( https://docs.vllm.ai/en/latest/models/supported_models.html for supported models. vllm_server_kwargs: Any additional kwargs to be passed into your vllm - server when it is being created. Will be invoked using - `python -m vllm.entrypoints.openai.api_serverv - `. For example, you could pass - `{'echo': 'true'}` to prepend new messages with the previous message. - On ~16GB GPUs, pass lower ``max-num-seqs`` and - ``gpu-memory-utilization`` values (see - ``apache_beam.examples.inference.vllm_text_completion``). For a list of - possible kwargs, see + server when it is being created. When ``use_dynamo`` is disabled, + this is invoked using ``python -m vllm.entrypoints.openai.api_server + ``. When ``use_dynamo`` is + enabled, these kwargs are passed to the ``dynamo.vllm`` worker + process. For example, you could pass ``{'echo': 'true'}`` to prepend + new messages with the previous message. On ~16GB GPUs, pass lower + ``max-num-seqs`` and ``gpu-memory-utilization`` values (see + ``apache_beam.examples.inference.vllm_text_completion``). For a list + of possible kwargs, see https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-completions-api + use_dynamo: Whether to use NVIDIA Dynamo as the underlying vLLM engine. + Requires installing Dynamo in your runtime environment + (``pip install ai-dynamo[vllm]``). This is an opt-in single-worker + embedded mode; KV-aware routing, disaggregated prefill/decode, KVBM + offload across nodes, the Planner, and Grove are not active in + embedded mode. Dynamo also requires an etcd-style discovery service: + when ``ETCD_ENDPOINTS`` is unset, Beam starts a local etcd, which + requires the ``etcd`` binary in the worker environment. + dynamo_frontend_kwargs: Additional kwargs to be passed to the + ``dynamo.frontend`` process when ``use_dynamo`` is enabled. By + default, embedded Dynamo uses etcd discovery, TCP request plane, ZMQ + event plane, round-robin routing, and disables router KV events. min_batch_size: optional. the minimum batch size to use when batching inputs. max_batch_size: optional. the maximum batch size to use when batching @@ -229,10 +415,20 @@ def __init__( batch_length_fn=batch_length_fn, batch_bucket_boundaries=batch_bucket_boundaries) self._model_name = model_name - self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {} + self._vllm_server_kwargs: dict[str, Optional[str]] = ({ + **_DYNAMO_ENGINE_DEFAULT_KWARGS, **(vllm_server_kwargs or {}) + } if use_dynamo else vllm_server_kwargs or {}) + self._dynamo_frontend_kwargs: dict[str, Optional[str]] = { + **_DYNAMO_FRONTEND_DEFAULT_KWARGS, **(dynamo_frontend_kwargs or {}) + } + self._use_dynamo = use_dynamo def load_model(self) -> _VLLMModelServer: - return _VLLMModelServer(self._model_name, self._vllm_server_kwargs) + return _VLLMModelServer( + self._model_name, + self._vllm_server_kwargs, + self._dynamo_frontend_kwargs, + self._use_dynamo) async def _async_run_inference( self, @@ -274,6 +470,12 @@ def run_inference( """ return asyncio.run(self._async_run_inference(batch, model, inference_args)) + def validate_inference_args(self, inference_args: Optional[dict[str, Any]]): + # Override the base validator so OpenAI-compatible request kwargs such as + # ``max_tokens`` can be passed through ``RunInference`` to the vLLM / + # Dynamo server. + pass + def share_model_across_processes(self) -> bool: return True @@ -285,8 +487,10 @@ def __init__( self, model_name: str, chat_template_path: Optional[str] = None, - vllm_server_kwargs: Optional[dict[str, str]] = None, + vllm_server_kwargs: Optional[dict[str, Optional[str]]] = None, *, + use_dynamo: bool = False, + dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, @@ -311,12 +515,26 @@ def __init__( For info on chat templates, see: https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#chat-template vllm_server_kwargs: Any additional kwargs to be passed into your vllm - server when it is being created. Will be invoked using - `python -m vllm.entrypoints.openai.api_serverv - `. For example, you could pass - `{'echo': 'true'}` to prepend new messages with the previous message. - For a list of possible kwargs, see + server when it is being created. When ``use_dynamo`` is disabled, + this is invoked using ``python -m vllm.entrypoints.openai.api_server + ``. When ``use_dynamo`` is + enabled, these kwargs are passed to the ``dynamo.vllm`` worker + process. For example, you could pass ``{'echo': 'true'}`` to prepend + new messages with the previous message. For a list of possible + kwargs, see https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-chat-api + use_dynamo: Whether to use NVIDIA Dynamo as the underlying vLLM engine. + Requires installing Dynamo in your runtime environment + (``pip install ai-dynamo[vllm]``). This is an opt-in single-worker + embedded mode; KV-aware routing, disaggregated prefill/decode, KVBM + offload across nodes, the Planner, and Grove are not active in + embedded mode. Dynamo also requires an etcd-style discovery service: + when ``ETCD_ENDPOINTS`` is unset, Beam starts a local etcd, which + requires the ``etcd`` binary in the worker environment. + dynamo_frontend_kwargs: Additional kwargs to be passed to the + ``dynamo.frontend`` process when ``use_dynamo`` is enabled. By + default, embedded Dynamo uses etcd discovery, TCP request plane, ZMQ + event plane, round-robin routing, and disables router KV events. min_batch_size: optional. the minimum batch size to use when batching inputs. max_batch_size: optional. the maximum batch size to use when batching @@ -340,9 +558,15 @@ def __init__( batch_length_fn=batch_length_fn, batch_bucket_boundaries=batch_bucket_boundaries) self._model_name = model_name - self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {} + self._vllm_server_kwargs: dict[str, Optional[str]] = ({ + **_DYNAMO_ENGINE_DEFAULT_KWARGS, **(vllm_server_kwargs or {}) + } if use_dynamo else vllm_server_kwargs or {}) + self._dynamo_frontend_kwargs: dict[str, Optional[str]] = { + **_DYNAMO_FRONTEND_DEFAULT_KWARGS, **(dynamo_frontend_kwargs or {}) + } self._chat_template_path = chat_template_path self._chat_file = f'template-{uuid.uuid4().hex}.jinja' + self._use_dynamo = use_dynamo def load_model(self) -> _VLLMModelServer: chat_template_contents = '' @@ -355,7 +579,11 @@ def load_model(self) -> _VLLMModelServer: f.write(chat_template_contents) self._vllm_server_kwargs['chat_template'] = local_chat_template_path - return _VLLMModelServer(self._model_name, self._vllm_server_kwargs) + return _VLLMModelServer( + self._model_name, + self._vllm_server_kwargs, + self._dynamo_frontend_kwargs, + self._use_dynamo) async def _async_run_inference( self, @@ -400,5 +628,11 @@ def run_inference( """ return asyncio.run(self._async_run_inference(batch, model, inference_args)) + def validate_inference_args(self, inference_args: Optional[dict[str, Any]]): + # Override the base validator so OpenAI-compatible request kwargs such as + # ``max_tokens`` can be passed through ``RunInference`` to the vLLM / + # Dynamo server. + pass + def share_model_across_processes(self) -> bool: return True diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference_test.py b/sdks/python/apache_beam/ml/inference/vllm_inference_test.py new file mode 100644 index 000000000000..4ff5186178e3 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/vllm_inference_test.py @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import sys +import types +import unittest +from unittest import mock + +# Protect against environments where the OpenAI python library is not +# available. The command-construction tests below do not actually need a +# real OpenAI client; stubbing the module is enough for vllm_inference to +# import cleanly. +# pylint: disable=wrong-import-order, wrong-import-position +try: + import openai # pylint: disable=unused-import +except ImportError: + openai = types.ModuleType('openai') + + class _FakeOpenAI: + pass + + openai.AsyncOpenAI = _FakeOpenAI + openai.OpenAI = _FakeOpenAI + sys.modules['openai'] = openai + +from apache_beam.ml.inference import vllm_inference + + +class _FakeProcess: + def __init__(self): + self.returncode = None + + def poll(self): + return self.returncode + + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + return self.returncode + + def kill(self): + self.returncode = -9 + + +class _FakeModels: + def list(self): + return types.SimpleNamespace(data=[object()]) + + +class _FakeClient: + def __init__(self): + self.models = _FakeModels() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + return False + + +def _record_start_process(commands): + def start_process(cmd): + commands.append(list(cmd)) + return _FakeProcess(), 10000 + len(commands) + + return start_process + + +class VLLMInferenceTest(unittest.TestCase): + def test_native_vllm_starts_single_server_process(self): + commands = [] + with mock.patch.object(vllm_inference, + 'start_process', + _record_start_process(commands)): + with mock.patch.object(vllm_inference, 'getVLLMClient'): + vllm_inference.getVLLMClient.return_value = _FakeClient() + vllm_inference.VLLMCompletionsModelHandler( + model_name='test-model', + vllm_server_kwargs={ + 'gpu-memory-utilization': '0.9' + }).load_model() + self.assertEqual(1, len(commands)) + self.assertIn('vllm.entrypoints.openai.api_server', commands[0]) + self.assertIn('--model', commands[0]) + self.assertIn('test-model', commands[0]) + self.assertIn('--gpu-memory-utilization', commands[0]) + self.assertIn('0.9', commands[0]) + self.assertNotIn('dynamo.frontend', commands[0]) + self.assertNotIn('dynamo.vllm', commands[0]) + + def test_dynamo_starts_frontend_and_engine_with_separate_kwargs(self): + commands = [] + with mock.patch.dict(os.environ, + {'ETCD_ENDPOINTS': 'http://127.0.0.1:2379'}): + with mock.patch.object(vllm_inference, + 'start_process', + _record_start_process(commands)): + with mock.patch.object(vllm_inference, 'getVLLMClient'): + vllm_inference.getVLLMClient.return_value = _FakeClient() + vllm_inference.VLLMCompletionsModelHandler( + model_name='test-model', + vllm_server_kwargs={ + 'tensor-parallel-size': '1' + }, + use_dynamo=True, + dynamo_frontend_kwargs={ + 'router-mode': 'round-robin' + }).load_model() + self.assertEqual(2, len(commands)) + frontend_cmd = commands[0] + engine_cmd = commands[1] + self.assertIn('dynamo.frontend', frontend_cmd) + self.assertIn('--http-port', frontend_cmd) + self.assertIn('--discovery-backend', frontend_cmd) + self.assertIn('--request-plane', frontend_cmd) + self.assertIn('--event-plane', frontend_cmd) + self.assertIn('--router-mode', frontend_cmd) + self.assertIn('--no-router-kv-events', frontend_cmd) + self.assertNotIn('--model', frontend_cmd) + self.assertNotIn('--tensor-parallel-size', frontend_cmd) + self.assertNotIn('--kv-events-config', frontend_cmd) + self.assertIn('dynamo.vllm', engine_cmd) + self.assertIn('--model', engine_cmd) + self.assertIn('test-model', engine_cmd) + self.assertIn('--discovery-backend', engine_cmd) + self.assertIn('--request-plane', engine_cmd) + self.assertIn('--event-plane', engine_cmd) + self.assertIn('--kv-events-config', engine_cmd) + self.assertIn('--tensor-parallel-size', engine_cmd) + self.assertNotIn('--http-port', engine_cmd) + self.assertNotIn('--router-mode', engine_cmd) + self.assertNotIn('--no-router-kv-events', engine_cmd) + + def test_validate_inference_args_accepts_openai_request_kwargs(self): + vllm_inference.VLLMCompletionsModelHandler( + 'test-model').validate_inference_args({'max_tokens': 8}) + vllm_inference.VLLMChatModelHandler('test-model').validate_inference_args( + {'max_tokens': 8}) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 0e5d0b01ee14..e4e25c851bc4 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -478,6 +478,26 @@ def vllmTests = tasks.create("vllmTests") { executable 'sh' args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" } + // TODO(https://github.com/apache/beam/pull/36966): Add Dataflow IT + // coverage for the embedded Dynamo path. The example pipeline already + // supports --use_dynamo (see VLLMCompletionsModelHandler). Enabling this + // requires updating vllm.dockerfile.old to install etcd and + // ai-dynamo[vllm], and provisioning a GPU pool with enough memory for + // the model plus the Dynamo runtime (e.g. NVIDIA L4 on g2-standard-4). + // The embedded mode was validated end-to-end on a T4 VM with + // Qwen/Qwen3-0.6B via DirectRunner; this change is scoped to the + // runtime, example, and unit-test changes only. + // + // def dynamoArgMap = argMap + [ + // "machine_type": "g2-standard-4", + // "model": "Qwen/Qwen3-0.6B", + // "output": "gs://apache-beam-ml/outputs/vllm_dynamo_predictions.txt", + // ] + // def dynamoCmdArgs = mapToArgString(dynamoArgMap) + // exec { + // executable 'sh' + // args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $dynamoCmdArgs --use_dynamo --max_tokens 8 --experiment='worker_accelerator=type:nvidia-l4;count:1;install-nvidia-driver:5xx'" + // } } } From 329dd286e84055317c1e21e0ea81671f154c6f1f Mon Sep 17 00:00:00 2001 From: akshayjadiyanv Date: Wed, 27 May 2026 01:23:01 -0400 Subject: [PATCH 2/3] Harden _VLLMModelServer process lifecycle per code review Apply five robustness fixes flagged on PR #38701: - Track the temporary etcd data dir as self._etcd_data_dir and shutil.rmtree(..., ignore_errors=True) it in _stop_processes so worker restarts don't leak /tmp directories. - Wrap process.terminate() / process.wait() / process.kill() in a single try/except OSError to absorb the ProcessLookupError race when a process exits between poll() and the signal call. - Switch the ETCD_ENDPOINTS removal from `del os.environ[...]` to `os.environ.pop(..., None)` to be idempotent. - Wrap __del__ in try/except Exception so cleanup never raises during interpreter shutdown. - Add the embedded etcd process to the check_connectivity() poll loop so an etcd death fails fast instead of waiting out the 10-minute timeout. --- .../ml/inference/vllm_inference.py | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index 56ba0f71c122..e5d8918d5ec5 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -156,6 +156,7 @@ def __init__( self._server_process = None self._dynamo_process = None self._etcd_process = None + self._etcd_data_dir: Optional[str] = None self._managed_etcd_endpoint = None self._server_port: int = -1 self._server_process_lock = threading.RLock() @@ -167,12 +168,18 @@ def __init__( def _stop_process(process: Optional[subprocess.Popen]) -> None: if process is None or process.poll() is not None: return - process.terminate() + # A process may exit between poll() and terminate() / kill(), in which + # case the OS raises ProcessLookupError (or another OSError). Treat that + # as already-stopped so we don't bail out of the broader cleanup. try: - process.wait(timeout=10) - except subprocess.TimeoutExpired: - process.kill() - process.wait() + process.terminate() + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + except OSError: + pass def _stop_processes(self) -> None: self._stop_process(self._dynamo_process) @@ -180,7 +187,10 @@ def _stop_processes(self) -> None: self._stop_process(self._etcd_process) if (self._managed_etcd_endpoint is not None and os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint): - del os.environ['ETCD_ENDPOINTS'] + os.environ.pop('ETCD_ENDPOINTS', None) + if self._etcd_data_dir is not None: + shutil.rmtree(self._etcd_data_dir, ignore_errors=True) + self._etcd_data_dir = None self._dynamo_process = None self._server_process = None self._etcd_process = None @@ -201,7 +211,13 @@ def _process_status(self) -> str: return ', '.join(process_status) or 'no process status available' def __del__(self): - self._stop_processes() + # __del__ may run during interpreter shutdown when module globals can + # already be torn down; swallow any cleanup failures so we don't print + # a noisy traceback. + try: + self._stop_processes() + except Exception: # pylint: disable=broad-except + pass def _uses_embedded_etcd(self) -> bool: return ( @@ -238,7 +254,7 @@ def _ensure_etcd(self) -> None: "to an external etcd service.") etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}' - etcd_data_dir = f'/tmp/{etcd_name}' + self._etcd_data_dir = f'/tmp/{etcd_name}' peer_port, = subprocess_server.pick_port(None) etcd_cmd = [ 'etcd', @@ -255,7 +271,7 @@ def _ensure_etcd(self) -> None: '--initial-cluster', f'{etcd_name}=http://127.0.0.1:{peer_port}', '--data-dir', - etcd_data_dir, + self._etcd_data_dir, '--log-level', 'warn', ] @@ -318,7 +334,8 @@ def check_connectivity(self, retries=3, timeout_secs=600): while (time.time() - start_time < timeout_secs and self._server_process.poll() is None and (self._dynamo_process is None or - self._dynamo_process.poll() is None)): + self._dynamo_process.poll() is None) and + (self._etcd_process is None or self._etcd_process.poll() is None)): try: models = client.models.list().data logging.info('models: %s' % models) From 6f8849169b2db3e7c466152f9d0295049da80e8b Mon Sep 17 00:00:00 2001 From: akshayjadiyanv Date: Wed, 27 May 2026 14:21:34 -0400 Subject: [PATCH 3/3] Enable Dataflow IT for embedded Dynamo on T4 Bump vllm.dockerfile.old to apache-beam[gcp]==2.71.0 (and the COPY-from beam_python3.12_sdk image to 2.71.0), install ai-dynamo[vllm], and add the etcd binary required by embedded Dynamo's runtime discovery. Uncomment the Dynamo IT block in common.gradle. Drop the unused machine_type override so it inherits n1-standard-4 from argMap, and switch nvidia-l4 -> nvidia-tesla-t4 to match the existing native vLLM ITs and the local Dataflow validation (per @damccorm review). Validated end-to-end on Dataflow with Qwen/Qwen3-0.6B; the nvext.timing field present on every PredictionResult confirms the Dynamo frontend served the requests. --- .../test_resources/vllm.dockerfile.old | 14 ++++++-- .../python/test-suites/dataflow/common.gradle | 33 ++++++++----------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old b/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old index b9c99e49e02f..d0080debc092 100644 --- a/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old +++ b/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old @@ -34,14 +34,22 @@ RUN python3 --version RUN apt-get install -y curl RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12 && pip install --upgrade pip -RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.58.1 -RUN pip install openai vllm +RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.71.0 +RUN pip install --no-cache-dir openai vllm ai-dynamo[vllm] RUN apt install libcairo2-dev pkg-config python3-dev -y RUN pip install pycairo +# etcd binary required by embedded NVIDIA Dynamo for runtime discovery. +ENV ETCD_VERSION=v3.5.13 +RUN curl -L https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-linux-amd64.tar.gz -o /tmp/etcd.tar.gz && \ + tar xzf /tmp/etcd.tar.gz -C /tmp && \ + mv /tmp/etcd-${ETCD_VERSION}-linux-amd64/etcd /usr/local/bin/etcd && \ + chmod +x /usr/local/bin/etcd && \ + rm -rf /tmp/etcd* + # Copy the Apache Beam worker dependencies from the Beam Python 3.12 SDK image. -COPY --from=apache/beam_python3.12_sdk:2.58.1 /opt/apache/beam /opt/apache/beam +COPY --from=apache/beam_python3.12_sdk:2.71.0 /opt/apache/beam /opt/apache/beam # Set the entrypoint to Apache Beam SDK worker launcher. ENTRYPOINT [ "/opt/apache/beam/boot" ] \ No newline at end of file diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index e4e25c851bc4..10fceb799816 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -478,26 +478,19 @@ def vllmTests = tasks.create("vllmTests") { executable 'sh' args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" } - // TODO(https://github.com/apache/beam/pull/36966): Add Dataflow IT - // coverage for the embedded Dynamo path. The example pipeline already - // supports --use_dynamo (see VLLMCompletionsModelHandler). Enabling this - // requires updating vllm.dockerfile.old to install etcd and - // ai-dynamo[vllm], and provisioning a GPU pool with enough memory for - // the model plus the Dynamo runtime (e.g. NVIDIA L4 on g2-standard-4). - // The embedded mode was validated end-to-end on a T4 VM with - // Qwen/Qwen3-0.6B via DirectRunner; this change is scoped to the - // runtime, example, and unit-test changes only. - // - // def dynamoArgMap = argMap + [ - // "machine_type": "g2-standard-4", - // "model": "Qwen/Qwen3-0.6B", - // "output": "gs://apache-beam-ml/outputs/vllm_dynamo_predictions.txt", - // ] - // def dynamoCmdArgs = mapToArgString(dynamoArgMap) - // exec { - // executable 'sh' - // args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $dynamoCmdArgs --use_dynamo --max_tokens 8 --experiment='worker_accelerator=type:nvidia-l4;count:1;install-nvidia-driver:5xx'" - // } + // Embedded NVIDIA Dynamo path. Reuses the same sdk_container_image + // (vllm.dockerfile.old now installs etcd and ai-dynamo[vllm]) and the + // same nvidia-tesla-t4 accelerator as the native vLLM ITs above. + // Validated end-to-end on Dataflow with Qwen/Qwen3-0.6B on T4. + def dynamoArgMap = argMap + [ + "model": "Qwen/Qwen3-0.6B", + "output": "gs://apache-beam-ml/outputs/vllm_dynamo_predictions.txt", + ] + def dynamoCmdArgs = mapToArgString(dynamoArgMap) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $dynamoCmdArgs --use_dynamo --max_tokens 8 --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" + } } }