Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 93 additions & 12 deletions utilities/stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def create_yolo(pipeline: dai.Pipeline, camera: dai.node.ColorCamera) -> Tuple[s
yoloDet.input.setBlocking(False)
camera.preview.link(yoloDet.input)
xoutColor = pipeline.createXLinkOut()
passthrough_q_name = f"preview_{camera.getBoardSocket()}"
# Match the rest of this script's naming: "preview_<SOCKET_NAME>"
passthrough_q_name = "preview_" + camera.getBoardSocket().name
xoutColor.setStreamName(passthrough_q_name)
yoloDet.passthrough.link(xoutColor.input)
xout_yolo = pipeline.createXLinkOut()
Expand Down Expand Up @@ -150,6 +151,24 @@ def stress_test(mxid: str = ""):
parser = argparse.ArgumentParser()
parser.add_argument("-ne", "--n-edge-detectors", default=0, type=int, help="Number of edge detectors to create.")
parser.add_argument("--no-nnet", action="store_true", default=False, help="Don't create a neural network.")
parser.add_argument(
"--no-stereo",
action="store_true",
default=False,
help="Don't create stereo depth (even if a stereo pair is present).",
)
parser.add_argument(
"--slow-rampup",
action="store_true",
default=False,
help="Ramp IR dot/flood brightness after the pipeline starts (helps avoid device crashes).",
)
parser.add_argument(
"--rampup-seconds",
type=float,
default=5.0,
help="Duration (seconds) for --slow-rampup. Recommended: 5.",
)

# May have some unknown args
args, _ = parser.parse_known_args()
Expand All @@ -159,24 +178,76 @@ def stress_test(mxid: str = ""):
exp_time = 20000

import time

success, device_info = dai.Device.getDeviceByMxId(mxid)
cam_args = [] # Device info or no args at all
if success:
cam_args.append(device_info)
with dai.Device(*cam_args) as device:
print("Setting default dot intensity to", dot_intensity)
device.setIrLaserDotProjectorIntensity(dot_intensity)
print("Setting default flood intensity to", flood_intensity)
device.setIrFloodLightIntensity(flood_intensity)
if args.slow_rampup:
# Put IR to a known safe baseline before starting the pipeline.
# The actual configured targets are applied after the pipeline is running.
try:
device.setIrLaserDotProjectorIntensity(0.0)
device.setIrFloodLightIntensity(0.0)
except Exception as e:
print("Warning: Failed to set IR intensity baseline:", repr(e))
else:
print("Setting default dot intensity to", dot_intensity)
device.setIrLaserDotProjectorIntensity(dot_intensity)
print("Setting default flood intensity to", flood_intensity)
device.setIrFloodLightIntensity(flood_intensity)

pipeline, outputs, pipeline_context = build_pipeline(device, args)
device.startPipeline(pipeline)

ramp_start_t: Optional[float] = None
ramp_last_set_t: float = 0.0
ramp_last_dot: Optional[float] = None
ramp_last_flood: Optional[float] = None

if args.slow_rampup:
print(
f"Slow rampup enabled: ramping dot={dot_intensity:.2f}, flood={flood_intensity:.2f} over {args.rampup_seconds:.1f}s (non-blocking)"
)
ramp_start_t = time.time()

start_time = time.time()
queues = [device.getOutputQueue(name, size, False)
for name, size in outputs if name != "sys_log"]
camera_control_q = device.getInputQueue("cam_control")
sys_info_q = device.getOutputQueue("sys_log", 1, False)
usb_speed = device.getUsbSpeed()
while True:
# Ramp IR on-the-fly while we keep draining queues.
if ramp_start_t is not None:
now = time.time()
dur = float(args.rampup_seconds)
# Rate-limit device setter calls; these are RPCs and can be chatty.
if dur <= 0:
frac = 1.0
else:
frac = clamp((now - ramp_start_t) / dur, 0.0, 1.0)

# Update at most ~20Hz, or on final completion.
if (now - ramp_last_set_t) >= 0.05 or frac >= 1.0:
dot = dot_intensity * frac
flood = flood_intensity * frac
try:
if ramp_last_dot is None or abs(dot - ramp_last_dot) >= 1e-3:
device.setIrLaserDotProjectorIntensity(dot)
ramp_last_dot = dot
if ramp_last_flood is None or abs(flood - ramp_last_flood) >= 1e-3:
device.setIrFloodLightIntensity(flood)
ramp_last_flood = flood
except Exception as e:
print("Warning: IR rampup failed:", repr(e))
ramp_start_t = None
ramp_last_set_t = now

if frac >= 1.0:
ramp_start_t = None

for queue in queues:
packet = queue.tryGet()
# print("QUEUE", queue.getName(), "PACKET", packet)
Expand Down Expand Up @@ -431,7 +502,7 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s
edge_detector.outputImage.link(edge_detector_xlink.input)
xlink_outs.append((stream_name, 5))

if left and right:
if left and right and not args.no_stereo:
if left.getResolutionWidth() > 1280:
print("Left camera width is greater than 1280, setting ISP scale to 2/3")
left.setIspScale(2, 3)
Expand All @@ -442,18 +513,19 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s
output = "out" if hasattr(left, "out") else "video"
getattr(left, output).link(stereo.left)
getattr(right, output).link(stereo.right)
stereo.setDefaultProfilePreset(
dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setOutputSize(left.getResolutionWidth(),
left.getResolutionHeight())
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setOutputSize(left.getResolutionWidth(), left.getResolutionHeight())
stereo.setLeftRightCheck(True)
stereo.setSubpixel(True)
stereo.setDepthAlign(align_socket)
else:
print("Device doesn't have a stereo pair, skipping stereo depth creation...")
if args.no_stereo and left and right:
print("--no-stereo set, skipping stereo depth creation...")
else:
print("Device doesn't have a stereo pair, skipping stereo depth creation...")
if color_cam is not None:
if not args.no_nnet:
if left is not None and right is not None: # Create spatial detection net
if left is not None and right is not None and not args.no_stereo: # Create spatial detection net
print("Creating spatial detection network...")
yolo = pipeline.createYoloSpatialDetectionNetwork()
blob_path = get_or_download_yolo_blob()
Expand All @@ -472,6 +544,15 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s
color_cam.preview.link(yolo.input)
stereo.depth.link(yolo.inputDepth)

# Always export the color preview stream, even when the color cam is used by the spatial NN
# (otherwise CAM_C looks "missing" since only depth+yolo are published).
passthrough_q_name = "preview_" + color_cam.getBoardSocket().name
xout_color = pipeline.createXLinkOut()
xout_color.setStreamName(passthrough_q_name)
yolo.passthrough.link(xout_color.input)
xlink_outs.append((passthrough_q_name, 4))
context.q_name_yolo_passthrough = passthrough_q_name

xout_depth = pipeline.createXLinkOut()
depth_q_name = "stereo depth"
xout_depth.setStreamName(depth_q_name)
Expand Down
Loading