From e3c7881612b5eeadb0c3fea946cb644b56a018e6 Mon Sep 17 00:00:00 2001 From: zrezke Date: Fri, 22 May 2026 13:54:25 +0200 Subject: [PATCH 1/3] stress-test: Add a flag to slowly ramp up dot and flood lights. --- utilities/stress_test.py | 73 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index 88493336c..2efeb1254 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -150,6 +150,18 @@ def stress_test(mxid: str = ""): parser = argparse.ArgumentParser() parser.add_argument("-ne", "--n-edge-detectors", default=0, type=int, help="Number of edge detectors to create.") parser.add_argument("--no-nnet", action="store_true", default=False, help="Don't create a neural network.") + parser.add_argument( + "--slow-rampup", + action="store_true", + default=False, + help="Ramp IR dot/flood brightness after the pipeline starts (helps avoid device crashes).", + ) + parser.add_argument( + "--rampup-seconds", + type=float, + default=5.0, + help="Duration (seconds) for --slow-rampup. Recommended: 5.", + ) # May have some unknown args args, _ = parser.parse_known_args() @@ -159,17 +171,40 @@ def stress_test(mxid: str = ""): exp_time = 20000 import time + success, device_info = dai.Device.getDeviceByMxId(mxid) cam_args = [] # Device info or no args at all if success: cam_args.append(device_info) with dai.Device(*cam_args) as device: - print("Setting default dot intensity to", dot_intensity) - device.setIrLaserDotProjectorIntensity(dot_intensity) - print("Setting default flood intensity to", flood_intensity) - device.setIrFloodLightIntensity(flood_intensity) + if args.slow_rampup: + # Put IR to a known safe baseline before starting the pipeline. + # The actual configured targets are applied after the pipeline is running. + try: + device.setIrLaserDotProjectorIntensity(0.0) + device.setIrFloodLightIntensity(0.0) + except Exception as e: + print("Warning: Failed to set IR intensity baseline:", repr(e)) + else: + print("Setting default dot intensity to", dot_intensity) + device.setIrLaserDotProjectorIntensity(dot_intensity) + print("Setting default flood intensity to", flood_intensity) + device.setIrFloodLightIntensity(flood_intensity) + pipeline, outputs, pipeline_context = build_pipeline(device, args) device.startPipeline(pipeline) + + ramp_start_t: Optional[float] = None + ramp_last_set_t: float = 0.0 + ramp_last_dot: Optional[float] = None + ramp_last_flood: Optional[float] = None + + if args.slow_rampup: + print( + f"Slow rampup enabled: ramping dot={dot_intensity:.2f}, flood={flood_intensity:.2f} over {args.rampup_seconds:.1f}s (non-blocking)" + ) + ramp_start_t = time.time() + start_time = time.time() queues = [device.getOutputQueue(name, size, False) for name, size in outputs if name != "sys_log"] @@ -177,6 +212,35 @@ def stress_test(mxid: str = ""): sys_info_q = device.getOutputQueue("sys_log", 1, False) usb_speed = device.getUsbSpeed() while True: + # Ramp IR on-the-fly while we keep draining queues. + if ramp_start_t is not None: + now = time.time() + dur = float(args.rampup_seconds) + # Rate-limit device setter calls; these are RPCs and can be chatty. + if dur <= 0: + frac = 1.0 + else: + frac = clamp((now - ramp_start_t) / dur, 0.0, 1.0) + + # Update at most ~20Hz, or on final completion. + if (now - ramp_last_set_t) >= 0.05 or frac >= 1.0: + dot = dot_intensity * frac + flood = flood_intensity * frac + try: + if ramp_last_dot is None or abs(dot - ramp_last_dot) >= 1e-3: + device.setIrLaserDotProjectorIntensity(dot) + ramp_last_dot = dot + if ramp_last_flood is None or abs(flood - ramp_last_flood) >= 1e-3: + device.setIrFloodLightIntensity(flood) + ramp_last_flood = flood + except Exception as e: + print("Warning: IR rampup failed:", repr(e)) + ramp_start_t = None + ramp_last_set_t = now + + if frac >= 1.0: + ramp_start_t = None + for queue in queues: packet = queue.tryGet() # print("QUEUE", queue.getName(), "PACKET", packet) @@ -296,6 +360,7 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s eeprom = calib.getEepromData() left_socket = eeprom.stereoRectificationData.leftCameraSocket right_socket = eeprom.stereoRectificationData.rightCameraSocket + print(f"L: {left_socket} | R: {right_socket}") align_socket = [ cam.socket for cam in camera_features From ebdc1281313b2185fce3beede3dcb52fbe77426e Mon Sep 17 00:00:00 2001 From: Filip Jeretina <59307111+zrezke@users.noreply.github.com> Date: Fri, 22 May 2026 13:55:41 +0200 Subject: [PATCH 2/3] Update utilities/stress_test.py --- utilities/stress_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index 2efeb1254..82b585cb0 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -360,7 +360,6 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s eeprom = calib.getEepromData() left_socket = eeprom.stereoRectificationData.leftCameraSocket right_socket = eeprom.stereoRectificationData.rightCameraSocket - print(f"L: {left_socket} | R: {right_socket}") align_socket = [ cam.socket for cam in camera_features From 8417eb6c317db19658656be67d56b5e21557c411 Mon Sep 17 00:00:00 2001 From: zrezke Date: Mon, 25 May 2026 01:29:47 +0200 Subject: [PATCH 3/3] Added flag for disabling stereo on stress_test.py --- utilities/stress_test.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index 82b585cb0..9c5c3dc2a 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -119,7 +119,8 @@ def create_yolo(pipeline: dai.Pipeline, camera: dai.node.ColorCamera) -> Tuple[s yoloDet.input.setBlocking(False) camera.preview.link(yoloDet.input) xoutColor = pipeline.createXLinkOut() - passthrough_q_name = f"preview_{camera.getBoardSocket()}" + # Match the rest of this script's naming: "preview_" + passthrough_q_name = "preview_" + camera.getBoardSocket().name xoutColor.setStreamName(passthrough_q_name) yoloDet.passthrough.link(xoutColor.input) xout_yolo = pipeline.createXLinkOut() @@ -150,6 +151,12 @@ def stress_test(mxid: str = ""): parser = argparse.ArgumentParser() parser.add_argument("-ne", "--n-edge-detectors", default=0, type=int, help="Number of edge detectors to create.") parser.add_argument("--no-nnet", action="store_true", default=False, help="Don't create a neural network.") + parser.add_argument( + "--no-stereo", + action="store_true", + default=False, + help="Don't create stereo depth (even if a stereo pair is present).", + ) parser.add_argument( "--slow-rampup", action="store_true", @@ -495,7 +502,7 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s edge_detector.outputImage.link(edge_detector_xlink.input) xlink_outs.append((stream_name, 5)) - if left and right: + if left and right and not args.no_stereo: if left.getResolutionWidth() > 1280: print("Left camera width is greater than 1280, setting ISP scale to 2/3") left.setIspScale(2, 3) @@ -506,18 +513,19 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s output = "out" if hasattr(left, "out") else "video" getattr(left, output).link(stereo.left) getattr(right, output).link(stereo.right) - stereo.setDefaultProfilePreset( - dai.node.StereoDepth.PresetMode.HIGH_DENSITY) - stereo.setOutputSize(left.getResolutionWidth(), - left.getResolutionHeight()) + stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY) + stereo.setOutputSize(left.getResolutionWidth(), left.getResolutionHeight()) stereo.setLeftRightCheck(True) stereo.setSubpixel(True) stereo.setDepthAlign(align_socket) else: - print("Device doesn't have a stereo pair, skipping stereo depth creation...") + if args.no_stereo and left and right: + print("--no-stereo set, skipping stereo depth creation...") + else: + print("Device doesn't have a stereo pair, skipping stereo depth creation...") if color_cam is not None: if not args.no_nnet: - if left is not None and right is not None: # Create spatial detection net + if left is not None and right is not None and not args.no_stereo: # Create spatial detection net print("Creating spatial detection network...") yolo = pipeline.createYoloSpatialDetectionNetwork() blob_path = get_or_download_yolo_blob() @@ -536,6 +544,15 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s color_cam.preview.link(yolo.input) stereo.depth.link(yolo.inputDepth) + # Always export the color preview stream, even when the color cam is used by the spatial NN + # (otherwise CAM_C looks "missing" since only depth+yolo are published). + passthrough_q_name = "preview_" + color_cam.getBoardSocket().name + xout_color = pipeline.createXLinkOut() + xout_color.setStreamName(passthrough_q_name) + yolo.passthrough.link(xout_color.input) + xlink_outs.append((passthrough_q_name, 4)) + context.q_name_yolo_passthrough = passthrough_q_name + xout_depth = pipeline.createXLinkOut() depth_q_name = "stereo depth" xout_depth.setStreamName(depth_q_name)