Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
de485fd
feat(cli): add daemon mode for dimos run (DIM-681)
spomichter Mar 6, 2026
ff5094d
fix: address greptile review — fd leak, wrong PID, fabricated log path
spomichter Mar 6, 2026
9cde9ef
feat(cli): add dimos stop and dimos status commands (DIM-682, DIM-684)
spomichter Mar 6, 2026
21f16e7
test: add e2e daemon lifecycle tests with PingPong blueprint
spomichter Mar 6, 2026
3854c8a
fix: rename stderr.log to daemon.log (addresses greptile review)
spomichter Mar 6, 2026
d1fbc51
fix: resolve mypy type errors in stop command (DIM-681)
spomichter Mar 6, 2026
cf3560f
feat: per-run log directory with unified main.jsonl (DIM-685)
spomichter Mar 6, 2026
f1e5a01
fix: migrate existing FileHandlers when set_run_log_dir is called
spomichter Mar 6, 2026
1541993
chore: move daemon tests to dimos/core/ for CI discovery
spomichter Mar 6, 2026
50d456d
chore: mark e2e daemon tests as slow
spomichter Mar 6, 2026
7446f75
test: add CLI integration tests for dimos stop and dimos status (DIM-…
spomichter Mar 6, 2026
0b0caa2
test: add e2e CLI tests against real running blueprint (DIM-682, DIM-…
spomichter Mar 6, 2026
9587115
fix: address paul's review comments
spomichter Mar 6, 2026
5d8b76b
fix: drop daemon.log, redirect all stdio to /dev/null
spomichter Mar 6, 2026
8ce1a9d
fix: restore LOG_BASE_DIR import, remove duplicate set_run_log_dir im…
spomichter Mar 6, 2026
ec1a3f4
fix: address remaining paul review comments
spomichter Mar 6, 2026
c33bd9b
fix: address all remaining paul review comments
spomichter Mar 6, 2026
6f7c4c5
fix: remove module docstring from test_daemon.py
spomichter Mar 6, 2026
e773195
fix: resolve mypy 'RunEntry is not defined' in CLI
spomichter Mar 7, 2026
37cc48a
refactor: address remaining Paul review comments
spomichter Mar 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dimos/agents/mcp/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ def on_system_modules(self, modules: list[RPCClient]) -> None:
for skill in app.state.skills
}

def _start_server(self, port: int = 9990) -> None:
config = uvicorn.Config(app, host="0.0.0.0", port=port, log_level="info")
def _start_server(self, port: int | None = None) -> None:
from dimos.core.global_config import global_config

_port = port if port is not None else global_config.mcp_port
_host = global_config.mcp_host
config = uvicorn.Config(app, host=_host, port=_port, log_level="info")
server = uvicorn.Server(config)
self._uvicorn_server = server
loop = self._loop
Expand Down
104 changes: 104 additions & 0 deletions dimos/core/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Daemonization and health-check support for DimOS processes."""

from __future__ import annotations

import os
import signal
import sys
from typing import TYPE_CHECKING

from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from pathlib import Path

from dimos.core.module_coordinator import ModuleCoordinator
from dimos.core.run_registry import RunEntry

logger = setup_logger()

# ---------------------------------------------------------------------------
# Health check (delegates to ModuleCoordinator.health_check)
# ---------------------------------------------------------------------------


def health_check(coordinator: ModuleCoordinator) -> bool:
"""Verify all coordinator workers are alive after build.

.. deprecated:: 0.1.0
Use ``coordinator.health_check()`` directly.
"""
return coordinator.health_check()


# ---------------------------------------------------------------------------
# Daemonize (double-fork)
# ---------------------------------------------------------------------------


def daemonize(log_dir: Path) -> None:
"""Double-fork daemonize the current process.

After this call the *caller* is the daemon grandchild.
stdin/stdout/stderr are redirected to ``/dev/null`` — all real
logging goes through structlog's FileHandler to ``main.jsonl``.
The two intermediate parents call ``os._exit(0)``.
"""
log_dir.mkdir(parents=True, exist_ok=True)

# First fork — detach from terminal
pid = os.fork()
if pid > 0:
os._exit(0)

os.setsid()

# Second fork — can never reacquire a controlling terminal
pid = os.fork()
if pid > 0:
os._exit(0)

# Redirect all stdio to /dev/null — structlog FileHandler is the log path
sys.stdout.flush()
sys.stderr.flush()

devnull = open(os.devnull)
os.dup2(devnull.fileno(), sys.stdin.fileno())
os.dup2(devnull.fileno(), sys.stdout.fileno())
os.dup2(devnull.fileno(), sys.stderr.fileno())
devnull.close()


# ---------------------------------------------------------------------------
# Signal handler for clean shutdown
# ---------------------------------------------------------------------------


def install_signal_handlers(entry: RunEntry, coordinator: ModuleCoordinator) -> None:
"""Install SIGTERM/SIGINT handlers that stop the coordinator and clean the registry."""

def _shutdown(signum: int, frame: object) -> None:
logger.info("Received signal, shutting down", signal=signum)
try:
coordinator.stop()
except Exception:
logger.error("Error during coordinator stop", exc_info=True)
entry.remove()
sys.exit(0)

signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
2 changes: 2 additions & 0 deletions dimos/core/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class GlobalConfig(BaseSettings):
robot_rotation_diameter: float = 0.6
planner_strategy: NavigationStrategy = "simple"
planner_robot_speed: float | None = None
mcp_port: int = 9990
mcp_host: str = "0.0.0.0"
dtop: bool = False

model_config = SettingsConfigDict(
Expand Down
35 changes: 35 additions & 0 deletions dimos/core/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dimos.core.module import Module, ModuleT
from dimos.core.resource_monitor.monitor import StatsMonitor
from dimos.core.rpc_client import ModuleProxy
from dimos.core.worker import Worker

logger = setup_logger()

Expand All @@ -49,6 +50,40 @@ def __init__(
self._global_config = cfg
self._deployed_modules = {}

@property
def workers(self) -> list[Worker]:
"""Active worker processes."""
if self._client is None:
return []
return self._client.workers

@property
def n_workers(self) -> int:
"""Number of active workers."""
return len(self.workers)

def health_check(self) -> bool:
"""Verify all workers are alive after build.

Since ``blueprint.build()`` is synchronous, every module should be
started by the time this runs. We just confirm no worker has died.
"""
if self.n_workers == 0:
logger.error("health_check: no workers found")
return False

for w in self.workers:
if w.pid is None:
logger.error("health_check: worker died", worker_id=w.worker_id)
return False

return True

@property
def n_modules(self) -> int:
"""Number of deployed modules."""
return len(self._deployed_modules)

def start(self) -> None:
n = self._n if self._n is not None else 2
self._client = WorkerManager(n_workers=n)
Expand Down
181 changes: 181 additions & 0 deletions dimos/core/run_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Run registry for tracking DimOS daemon processes."""

from __future__ import annotations

from dataclasses import asdict, dataclass, field
import json
import os
from pathlib import Path
import re
import time

from dimos.utils.logging_config import setup_logger

logger = setup_logger()


def _get_state_dir() -> Path:
"""XDG_STATE_HOME compliant state directory for dimos."""
xdg = os.environ.get("XDG_STATE_HOME")
if xdg:
return Path(xdg) / "dimos"
return Path.home() / ".local" / "state" / "dimos"


REGISTRY_DIR = _get_state_dir() / "runs"
LOG_BASE_DIR = _get_state_dir() / "logs"


@dataclass
class RunEntry:
"""Metadata for a single DimOS run (daemon or foreground)."""

run_id: str
pid: int
blueprint: str
started_at: str
log_dir: str
cli_args: list[str] = field(default_factory=list)
config_overrides: dict[str, object] = field(default_factory=dict)
grpc_port: int = 9877

@property
def registry_path(self) -> Path:
return REGISTRY_DIR / f"{self.run_id}.json"

def save(self) -> None:
"""Persist this entry to disk."""
REGISTRY_DIR.mkdir(parents=True, exist_ok=True)
self.registry_path.write_text(json.dumps(asdict(self), indent=2))

def remove(self) -> None:
"""Delete this entry from disk."""
self.registry_path.unlink(missing_ok=True)

@classmethod
def load(cls, path: Path) -> RunEntry:
"""Load a RunEntry from a JSON file."""
data = json.loads(path.read_text())
return cls(**data)


def generate_run_id(blueprint: str) -> str:
"""Generate a human-readable, timestamp-prefixed run ID."""
ts = time.strftime("%Y%m%d-%H%M%S")
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "-", blueprint)
return f"{ts}-{safe_name}"


def is_pid_alive(pid: int) -> bool:
"""Check whether a process with the given PID is still running."""
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Process exists but we can't signal it — still alive.
return True


def list_runs(alive_only: bool = True) -> list[RunEntry]:
"""List all registered runs, optionally filtering to alive processes."""
REGISTRY_DIR.mkdir(parents=True, exist_ok=True)
entries: list[RunEntry] = []
for f in sorted(REGISTRY_DIR.glob("*.json")):
try:
entry = RunEntry.load(f)
except Exception:
logger.warning("Corrupt registry entry, removing", path=str(f))
f.unlink()
continue

if alive_only and not is_pid_alive(entry.pid):
logger.info("Cleaning stale run entry", run_id=entry.run_id, pid=entry.pid)
entry.remove()
continue
entries.append(entry)
return entries


def cleanup_stale() -> int:
"""Remove registry entries for dead processes. Returns count removed."""
REGISTRY_DIR.mkdir(parents=True, exist_ok=True)
removed = 0
for f in list(REGISTRY_DIR.glob("*.json")):
try:
entry = RunEntry.load(f)
if not is_pid_alive(entry.pid):
entry.remove()
removed += 1
except Exception:
f.unlink()
removed += 1
return removed


def check_port_conflicts(grpc_port: int = 9877) -> RunEntry | None:
"""Check if any alive run is using the gRPC port. Returns conflicting entry or None."""
for entry in list_runs(alive_only=True):
if entry.grpc_port == grpc_port:
return entry
return None


def get_most_recent(alive_only: bool = True) -> RunEntry | None:
"""Return the most recently created run entry, or None."""
runs = list_runs(alive_only=alive_only)
return runs[-1] if runs else None


import signal


def stop_entry(entry: RunEntry, force: bool = False) -> tuple[str, bool]:
"""Stop a DimOS instance by registry entry.

Returns (message, success) for the CLI to display.
"""
sig = signal.SIGKILL if force else signal.SIGTERM
sig_name = "SIGKILL" if force else "SIGTERM"

try:
os.kill(entry.pid, sig)
except ProcessLookupError:
entry.remove()
return ("Process already dead, cleaning registry", True)

if not force:
for _ in range(50): # 5 seconds
if not is_pid_alive(entry.pid):
break
time.sleep(0.1)
else:
try:
os.kill(entry.pid, signal.SIGKILL)
except ProcessLookupError:
pass
else:
for _ in range(20):
if not is_pid_alive(entry.pid):
break
time.sleep(0.1)
entry.remove()
return (f"Escalated to SIGKILL after {sig_name} timeout", True)

entry.remove()
return (f"Stopped with {sig_name}", True)
Loading
Loading