Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions fastdeploy/inter_communicator/zmq_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import os
import pickle
import threading
import time
import traceback
Expand Down Expand Up @@ -110,7 +111,11 @@ def recv_json(self, flags: int = 0):
try:
# receive from socket
msg = self.socket.recv(flags=flags)
data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf))
try:
data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf))
except (UnicodeDecodeError, ValueError) as e:
llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 将原始消息字节 msg 直接打印到 warning 日志,若消息包含 prompt 等敏感内容,可能造成信息泄漏,且大消息体会产生大量日志。

建议只打印字节长度:

llm_logger.warning(f"recv_json decode failed, msg_len={len(msg)}, err={e}")

raise

# collect zmq recv metrics
_zmq_metrics_stats.msg_bytes_recv_total += len(msg)
Expand Down Expand Up @@ -152,7 +157,11 @@ def recv_pyobj(self, flags: int = 0):
_zmq_metrics_stats = ZMQMetricsStats()
self._ensure_socket()
data_bytes = self.socket.recv(flags=flags)
envelope = ForkingPickler.loads(data_bytes)
try:
envelope = ForkingPickler.loads(data_bytes)
except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e:
llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 将原始消息字节 data_bytes 直接打印到 warning 日志,若消息包含 prompt / token 数据,可能造成敏感信息泄漏,且大消息体会产生大量日志。

建议只打印字节长度:

llm_logger.warning(f"recv_pyobj decode failed, msg_len={len(data_bytes)}, err={e}")

raise
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
Expand Down Expand Up @@ -539,7 +548,12 @@ def recv_control_cmd(self):
"""
self._ensure_socket()
try:
client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK)
frames = self.socket.recv_multipart(flags=zmq.NOBLOCK)
if len(frames) < 2:
llm_logger.warning(f"recv_control_cmd: unexpected frame count {len(frames)}, dropping message")
return None
client = frames[0]
task_data = frames[-1]
task = msgpack.unpackb(task_data)
task_id_str = task["task_id"]
except zmq.Again:
Expand Down Expand Up @@ -577,6 +591,8 @@ def close(self):
llm_logger.info("ZMQ server is closing connection...")
try:
if self.socket is not None and not self.socket.closed:
if self.address:
self.socket.unbind(self.address)
Comment thread
cmcamdy marked this conversation as resolved.
Comment thread
cmcamdy marked this conversation as resolved.
self.socket.close()
if not self.context.closed:
self.context.term()
Expand Down
Loading