From b5e158577b80650437f2ad2f4f375f0a1dad3349 Mon Sep 17 00:00:00 2001 From: cmcamdy Date: Mon, 18 May 2026 12:04:57 +0000 Subject: [PATCH 1/3] fix zmq err catch --- fastdeploy/inter_communicator/zmq_server.py | 30 +++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 19cddf64928..56b6d89249a 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -15,6 +15,7 @@ """ import os +import pickle import threading import time import traceback @@ -109,8 +110,13 @@ def recv_json(self, flags: int = 0): _zmq_metrics_stats = ZMQMetricsStats() try: # receive from socket - msg = self.socket.recv(flags=flags) - data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) + frame = self.socket.recv(copy=False, flags=flags) + msg = frame.bytes + try: + data_dict = jsonapi.loads(msg) + except (UnicodeDecodeError, ValueError) as e: + llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}") + raise # collect zmq recv metrics _zmq_metrics_stats.msg_bytes_recv_total += len(msg) @@ -151,8 +157,13 @@ def recv_pyobj(self, flags: int = 0): """ _zmq_metrics_stats = ZMQMetricsStats() self._ensure_socket() - data_bytes = self.socket.recv(flags=flags) - envelope = ForkingPickler.loads(data_bytes) + frame = self.socket.recv(copy=False, flags=flags) + data_bytes = frame.bytes + try: + envelope = ForkingPickler.loads(data_bytes) + except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e: + llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}") + raise if isinstance(envelope, dict): if "__meta" in envelope and "send_ts" in envelope["__meta"]: _zmq_metrics_stats.msg_recv_total += 1 @@ -494,6 +505,8 @@ def close(self): self.worker_push_addresses.clear() if self.socket is not None and not self.socket.closed: + if self.address: + self.socket.unbind(self.address) self.socket.close() if not self.context.closed: self.context.term() @@ -539,7 +552,12 @@ def recv_control_cmd(self): """ self._ensure_socket() try: - client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK) + frames = self.socket.recv_multipart(flags=zmq.NOBLOCK) + if len(frames) < 2: + llm_logger.warning(f"recv_control_cmd: unexpected frame count {len(frames)}, dropping message") + return None + client = frames[0] + task_data = frames[-1] task = msgpack.unpackb(task_data) task_id_str = task["task_id"] except zmq.Again: @@ -577,6 +595,8 @@ def close(self): llm_logger.info("ZMQ server is closing connection...") try: if self.socket is not None and not self.socket.closed: + if self.address: + self.socket.unbind(self.address) self.socket.close() if not self.context.closed: self.context.term() From 119079687b8a9e1e04088c6c9d8e708583aa85f3 Mon Sep 17 00:00:00 2001 From: cmcamdy Date: Tue, 19 May 2026 02:36:36 +0000 Subject: [PATCH 2/3] fix unit --- fastdeploy/inter_communicator/zmq_server.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 56b6d89249a..9973d3b2854 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -110,8 +110,7 @@ def recv_json(self, flags: int = 0): _zmq_metrics_stats = ZMQMetricsStats() try: # receive from socket - frame = self.socket.recv(copy=False, flags=flags) - msg = frame.bytes + msg = self.socket.recv(flags=flags) try: data_dict = jsonapi.loads(msg) except (UnicodeDecodeError, ValueError) as e: @@ -157,8 +156,7 @@ def recv_pyobj(self, flags: int = 0): """ _zmq_metrics_stats = ZMQMetricsStats() self._ensure_socket() - frame = self.socket.recv(copy=False, flags=flags) - data_bytes = frame.bytes + data_bytes = self.socket.recv(flags=flags) try: envelope = ForkingPickler.loads(data_bytes) except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e: @@ -505,8 +503,6 @@ def close(self): self.worker_push_addresses.clear() if self.socket is not None and not self.socket.closed: - if self.address: - self.socket.unbind(self.address) self.socket.close() if not self.context.closed: self.context.term() From b2bd32d39d25c17340fc67b296a355f54d27035c Mon Sep 17 00:00:00 2001 From: cmcamdy Date: Tue, 19 May 2026 02:39:03 +0000 Subject: [PATCH 3/3] fix unit --- fastdeploy/inter_communicator/zmq_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 9973d3b2854..2b6787864cf 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -112,7 +112,7 @@ def recv_json(self, flags: int = 0): # receive from socket msg = self.socket.recv(flags=flags) try: - data_dict = jsonapi.loads(msg) + data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) except (UnicodeDecodeError, ValueError) as e: llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}") raise