diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 19cddf64928..2b6787864cf 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -15,6 +15,7 @@ """ import os +import pickle import threading import time import traceback @@ -110,7 +111,11 @@ def recv_json(self, flags: int = 0): try: # receive from socket msg = self.socket.recv(flags=flags) - data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) + try: + data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) + except (UnicodeDecodeError, ValueError) as e: + llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}") + raise # collect zmq recv metrics _zmq_metrics_stats.msg_bytes_recv_total += len(msg) @@ -152,7 +157,11 @@ def recv_pyobj(self, flags: int = 0): _zmq_metrics_stats = ZMQMetricsStats() self._ensure_socket() data_bytes = self.socket.recv(flags=flags) - envelope = ForkingPickler.loads(data_bytes) + try: + envelope = ForkingPickler.loads(data_bytes) + except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e: + llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}") + raise if isinstance(envelope, dict): if "__meta" in envelope and "send_ts" in envelope["__meta"]: _zmq_metrics_stats.msg_recv_total += 1 @@ -539,7 +548,12 @@ def recv_control_cmd(self): """ self._ensure_socket() try: - client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK) + frames = self.socket.recv_multipart(flags=zmq.NOBLOCK) + if len(frames) < 2: + llm_logger.warning(f"recv_control_cmd: unexpected frame count {len(frames)}, dropping message") + return None + client = frames[0] + task_data = frames[-1] task = msgpack.unpackb(task_data) task_id_str = task["task_id"] except zmq.Again: @@ -577,6 +591,8 @@ def close(self): llm_logger.info("ZMQ server is closing connection...") try: if self.socket is not None and not self.socket.closed: + if self.address: + self.socket.unbind(self.address) self.socket.close() if not self.context.closed: self.context.term()