Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions dimos/mapping/test_voxels.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,151 @@ def test_roundtrip(moment1: Go2MapperMoment, voxel_size: float, expected_points:
mapper.stop()


def test_autoscale_skips_when_saturated() -> None:
"""Test that autoscale skips frames when idle time < last processing duration.

_last_ingest_time is set after processing completes, so elapsed_since_last
measures idle time between frames. We fake both _last_ingest_time and
_last_ingest_duration to simulate a machine that just finished a slow frame
and is receiving new frames immediately after.
"""
data_dir = get_data("unitree_go2_office_walk2")
lidar_store = TimedSensorReplay(f"{data_dir}/lidar")

mapper = VoxelGridMapper(autoscale=True, autoscale_min_frequency=1.0)

# Process first frame normally to bootstrap
frame = lidar_store.find_closest_seek(1.0)
assert frame is not None
mapper._on_frame(frame)
assert mapper._frames_processed == 1

# Simulate: last frame just finished (ingest_time = now) and took 500ms.
# Next frames arriving with only 10ms idle time should be skipped.
mapper._last_ingest_time = time.monotonic()
mapper._last_ingest_duration = 0.5 # pretend last frame took 500ms

# Feed 10 frames with only 10ms idle time — well under the 500ms threshold
frames_fed = 1 # already fed one
for i in range(10):
frame = lidar_store.find_closest_seek(1.5 + i * 0.1)
if frame is not None:
time.sleep(0.01) # 10ms idle — way less than 500ms processing time
mapper._on_frame(frame)
frames_fed += 1

print(
f"\nAutoscale saturated: fed {frames_fed} frames, "
f"processed {mapper._frames_processed}, skipped {mapper._frames_skipped}"
)

# Most frames should be skipped: idle time (10ms) < processing time (500ms)
assert mapper._frames_skipped > 0
assert mapper._frames_processed < frames_fed
assert mapper._frames_processed + mapper._frames_skipped == frames_fed
assert mapper.size() > 0

mapper.stop()


def test_autoscale_min_frequency_zero_no_crash() -> None:
"""Test that autoscale_min_frequency=0 doesn't raise ZeroDivisionError."""
data_dir = get_data("unitree_go2_office_walk2")
lidar_store = TimedSensorReplay(f"{data_dir}/lidar")

mapper = VoxelGridMapper(autoscale=True, autoscale_min_frequency=0)

for i in range(5):
frame = lidar_store.find_closest_seek(i)
if frame is not None:
mapper._on_frame(frame) # must not raise ZeroDivisionError

assert mapper._frames_processed >= 1
mapper.stop()


def test_autoscale_disabled_processes_all() -> None:
"""Test that with autoscale=False, every frame is processed."""
data_dir = get_data("unitree_go2_office_walk2")
lidar_store = TimedSensorReplay(f"{data_dir}/lidar")

mapper = VoxelGridMapper(autoscale=False)

frames_fed = 0
for i in range(0, 10):
frame = lidar_store.find_closest_seek(i)
if frame is not None:
mapper._on_frame(frame)
frames_fed += 1

print(
f"\nNo autoscale: fed {frames_fed} frames, processed {mapper._frames_processed}, "
f"skipped {mapper._frames_skipped}"
)

# Every frame should be processed
assert mapper._frames_processed == frames_fed
assert mapper._frames_skipped == 0

mapper.stop()


def test_autoscale_min_frequency_respected() -> None:
"""Test that autoscale never drops below min_frequency."""
data_dir = get_data("unitree_go2_office_walk2")
lidar_store = TimedSensorReplay(f"{data_dir}/lidar")

# Set a high min_frequency to ensure frames aren't skipped too aggressively
mapper = VoxelGridMapper(autoscale=True, autoscale_min_frequency=100.0)

frames_fed = 0
for i in range(0, 10):
frame = lidar_store.find_closest_seek(i)
if frame is not None:
mapper._on_frame(frame)
frames_fed += 1

print(
f"\nHigh min_freq: fed {frames_fed} frames, processed {mapper._frames_processed}, "
f"skipped {mapper._frames_skipped}"
)

# With min_frequency=100Hz (max_interval=10ms), almost everything
# should be processed since add_frame likely takes >10ms
# The throttle interval is min(last_duration, 1/100) = min(last_duration, 0.01)
# so we throttle to 10ms, which means nearly all frames get through
assert mapper._frames_processed == frames_fed

mapper.stop()


def test_autoscale_rerun_logging() -> None:
"""Test that rerun telemetry is logged during autoscaled processing."""

data_dir = get_data("unitree_go2_office_walk2")
lidar_store = TimedSensorReplay(f"{data_dir}/lidar")

mapper = VoxelGridMapper(autoscale=True)

frame = lidar_store.find_closest_seek(1.0)
assert frame is not None

# Should not raise — rr.log calls are in _on_frame and publish_global_map
mapper._on_frame(frame)
mapper.publish_global_map()

assert mapper._frames_processed == 1
assert mapper._last_ingest_duration > 0
assert mapper._last_publish_duration > 0

print(
f"\nIngest: {mapper._last_ingest_duration * 1000:.1f}ms, "
f"Publish: {mapper._last_publish_duration * 1000:.1f}ms"
)

mapper.stop()


def test_roundtrip_range_preserved(mapper: VoxelGridMapper) -> None:
"""Test that input coordinate ranges are preserved in output."""
data_dir = get_data("unitree_go2_office_walk2")
Expand Down
71 changes: 70 additions & 1 deletion dimos/mapping/voxels.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from reactivex import interval, operators as ops
from reactivex.disposable import Disposable
from reactivex.subject import Subject
import rerun as rr

from dimos.core import In, Module, Out, rpc
from dimos.core.global_config import GlobalConfig, global_config
Expand All @@ -42,6 +43,10 @@ class Config(ModuleConfig):
block_count: int = 2_000_000
device: str = "CUDA:0"
carve_columns: bool = True
# Auto-scaling: skip frames when processing can't keep up
autoscale: bool = True
autoscale_min_frequency: float = 1.0 # never drop below this Hz
enable_telemetry: bool = True # log timing metrics to rerun


class VoxelGridMapper(Module):
Expand Down Expand Up @@ -78,6 +83,24 @@ def __init__(self, cfg: GlobalConfig = global_config, **kwargs: object) -> None:
self._key_dtype = self._voxel_hashmap.key_tensor().dtype
self._latest_frame_ts: float = 0.0

# Autoscale state
self._last_ingest_time: float = 0.0 # monotonic time of last add_frame()
self._last_ingest_duration: float = 0.0 # how long last add_frame() took
self._last_publish_duration: float = 0.0 # how long last publish took
self._frames_skipped: int = 0
self._frames_processed: int = 0
self._rr_initialized: bool = False

def _ensure_rr(self) -> None:
"""Initialize rerun recording in this process if not already done."""
if self._rr_initialized:
return
# Connect to existing "dimos" recording if the bridge started one,
# otherwise create a new one. init() with spawn=False is safe to call
# even if another process already called init("dimos").
rr.init("dimos", spawn=False)
self._rr_initialized = True

@rpc
def start(self) -> None:
super().start()
Expand Down Expand Up @@ -106,12 +129,58 @@ def stop(self) -> None:
super().stop()

def _on_frame(self, frame: PointCloud2) -> None:
if self.config.autoscale and self._frames_processed > 0:
now = time.monotonic()
# _last_ingest_time is set *after* processing completes, so this
# measures idle time since the last frame finished — not since it started.
# If idle time < last processing duration, we're falling behind: skip.
elapsed_since_last = now - self._last_ingest_time

# Guard against misconfiguration: min_frequency=0 would divide by zero.
# Treat 0 (or negative) as "no minimum floor" → use processing time only.
if self.config.autoscale_min_frequency > 0:
max_interval = 1.0 / self.config.autoscale_min_frequency
throttle_interval = min(self._last_ingest_duration, max_interval)
else:
throttle_interval = self._last_ingest_duration

if elapsed_since_last < throttle_interval:
self._frames_skipped += 1
return

t0 = time.monotonic()
self.add_frame(frame)
if self.config.publish_interval == 0:
ingest_duration = time.monotonic() - t0

# Record completion time (not start time) so the next frame's elapsed
# measures idle time between frames, enabling real saturation detection.
self._last_ingest_time = time.monotonic()
self._last_ingest_duration = ingest_duration
Comment on lines +151 to +158
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_last_ingest_time = t0 limits autoscale effectiveness

_last_ingest_time is set to t0 (the time before add_frame() runs). Since all transports (LCM, SharedMemory, ROS) deliver callbacks synchronously on a single thread, the next _on_frame call can only begin after the current one returns. This means elapsed_since_last = now - t0 will always be >= _last_ingest_duration, so the skip condition elapsed < throttle_interval is effectively never true in production.

The autoscale will only skip frames when _on_frame is called directly from a separate thread (not through the transport), or if _last_ingest_duration is artificially set higher than the actual processing time (as done in the test).

If you want skipping to work with synchronous transport delivery, consider setting _last_ingest_time to time.monotonic() (after processing completes), so the elapsed-time check measures the gap between frames exclusive of processing time:

Suggested change
t0 = time.monotonic()
self.add_frame(frame)
if self.config.publish_interval == 0:
ingest_duration = time.monotonic() - t0
self._last_ingest_time = t0
self._last_ingest_duration = ingest_duration
t0 = time.monotonic()
self.add_frame(frame)
ingest_duration = time.monotonic() - t0
self._last_ingest_time = time.monotonic()
self._last_ingest_duration = ingest_duration

With this change, frames that arrive back-to-back (queued in LCM during processing) would be correctly identified as arriving faster than the processing rate.

self._frames_processed += 1

if self.config.enable_telemetry:
self._ensure_rr()
rr.set_time("frame", sequence=self._frames_processed)
rr.set_time("wall_time", timestamp=time.time())
rr.log("voxel_mapper/ingest_time_ms", rr.Scalars(ingest_duration * 1000))
rr.log("voxel_mapper/map_size", rr.Scalars(self.size()))
rr.log("voxel_mapper/frames_skipped", rr.Scalars(self._frames_skipped))

if self.config.publish_interval == 0 and hasattr(self, "_publish_trigger"):
self._publish_trigger.on_next(None)

def publish_global_map(self) -> None:
t0 = time.monotonic()
pc = self.get_global_pointcloud2()
publish_duration = time.monotonic() - t0

self._last_publish_duration = publish_duration
if self.config.enable_telemetry:
self._ensure_rr()
rr.set_time("frame", sequence=self._frames_processed)
rr.set_time("wall_time", timestamp=time.time())
rr.log("voxel_mapper/publish_time_ms", rr.Scalars(publish_duration * 1000))

self.global_map.publish(pc)

def size(self) -> int:
Expand Down
Loading