Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions sdks/python/agenta/sdk/engines/running/runners/daytona.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from contextlib import contextmanager
from typing import Any, Dict, Generator, Union, Optional, TYPE_CHECKING

import httpx

import agenta as ag
from agenta.sdk.engines.running.runners.base import CodeRunner
from agenta.sdk.contexts.running import RunningContext
from agenta.sdk.utils.cache import TTLLRUCache
from agenta.sdk.utils.lazy import _load_daytona

from agenta.sdk.utils.logging import get_module_logger
Expand Down Expand Up @@ -57,6 +60,12 @@ class DaytonaRunner(CodeRunner):

_instance: Optional["DaytonaRunner"] = None

# Snapshot name lookup is org-scoped on Daytona's side, so a `general: true`
# snapshot owned by another org can't be resolved by name from our org —
# only by ID. Cache the (name, target) -> id mapping for a day to avoid
# listing snapshots on every sandbox create.
_snapshot_id_cache: TTLLRUCache = TTLLRUCache(capacity=32, ttl=86400)

def __new__(cls):
"""Singleton pattern to reuse Daytona client and sandbox."""
if cls._instance is None:
Expand Down Expand Up @@ -155,6 +164,49 @@ def _get_provider_env_vars(self) -> Dict[str, str]:

return env_vars

def _resolve_snapshot_id(self, name: str, target: str) -> str:
"""Resolve a snapshot name to its ID for the given region.

Sandbox creation by snapshot *name* only resolves snapshots owned by
the requesting org, even when the snapshot is marked ``general: true``
and visible in the dashboard. Resolving by *ID* works cross-org, so we
list snapshots, find one matching name + region + active state, and
cache the result.
"""
Comment thread
jp-agenta marked this conversation as resolved.
cache_key = (name, target)
cached = self._snapshot_id_cache.get(cache_key)
if cached is not None:
return cached

api_url = os.getenv("DAYTONA_API_URL") or "https://app.daytona.io/api"
api_key = os.getenv("DAYTONA_API_KEY")

# We don't paginate: we assume orgs have <=25 snapshots. If an org
# exceeds that and the target snapshot falls off page 1, this raises
# "not found" — by design, we fail rather than retry or scan further.
response = httpx.get(
f"{api_url.rstrip('/')}/snapshots",
params={"limit": 25},
headers={"Authorization": f"Bearer {api_key}"},
timeout=10.0,
)
response.raise_for_status()
items = response.json().get("items", [])
Comment thread
jp-agenta marked this conversation as resolved.

for item in items:
if (
item.get("name") == name
and item.get("state") == "active"
and target in (item.get("regionIds") or [])
):
snapshot_id = item["id"]
self._snapshot_id_cache.put(cache_key, snapshot_id)
return snapshot_id

Comment thread
jp-agenta marked this conversation as resolved.
raise RuntimeError(
f"No active Daytona snapshot named '{name}' found in region '{target}'."
)

def _create_sandbox(self, runtime: Optional[str] = None) -> Any:
"""Create a new sandbox for this run from snapshot.

Expand All @@ -169,14 +221,17 @@ def _create_sandbox(self, runtime: Optional[str] = None) -> Any:
runtime = runtime or "python"

# Select general snapshot
snapshot_id = os.getenv("DAYTONA_SNAPSHOT")
snapshot_ref = os.getenv("DAYTONA_SNAPSHOT")

if not snapshot_id:
if not snapshot_ref:
raise RuntimeError(
f"No Daytona snapshot configured for runtime '{runtime}'. "
f"Set DAYTONA_SNAPSHOT environment variable."
)

target = os.getenv("DAYTONA_TARGET") or os.getenv("AGENTA_REGION") or "eu"
snapshot_id = self._resolve_snapshot_id(snapshot_ref, target)

Comment thread
junaway marked this conversation as resolved.
_, _, _, CreateSandboxFromSnapshotParams = _load_daytona()

agenta_host = (
Expand Down
112 changes: 112 additions & 0 deletions sdks/python/oss/tests/pytest/unit/test_daytona_resolve_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from unittest.mock import MagicMock

import httpx
import pytest

from agenta.sdk.engines.running.runners.daytona import DaytonaRunner


def _make_response(items):
response = MagicMock()
response.raise_for_status = MagicMock()
response.json.return_value = {"items": items}
return response


@pytest.fixture
def runner(monkeypatch):
monkeypatch.setenv("DAYTONA_API_KEY", "test-key")
monkeypatch.delenv("DAYTONA_API_URL", raising=False)
r = DaytonaRunner()
# Singleton bleed-through: ensure a clean cache between tests.
r._snapshot_id_cache.cache.clear()
return r


def test_resolve_snapshot_id_returns_matching_active_snapshot(runner, monkeypatch):
calls = []

def fake_get(url, params=None, headers=None, timeout=None):
calls.append((url, params, headers))
return _make_response(
[
{"id": "id-1", "name": "other", "state": "active", "regionIds": ["eu"]},
{"id": "id-2", "name": "snap", "state": "active", "regionIds": ["eu"]},
{"id": "id-3", "name": "snap", "state": "active", "regionIds": ["us"]},
]
)

monkeypatch.setattr(httpx, "get", fake_get)

assert runner._resolve_snapshot_id("snap", "eu") == "id-2"
assert len(calls) == 1
assert calls[0][1] == {"limit": 25}
assert calls[0][2] == {"Authorization": "Bearer test-key"}


def test_resolve_snapshot_id_uses_cache_on_second_call(runner, monkeypatch):
call_count = {"n": 0}

def fake_get(url, params=None, headers=None, timeout=None):
call_count["n"] += 1
return _make_response(
[{"id": "id-1", "name": "snap", "state": "active", "regionIds": ["eu"]}]
)

monkeypatch.setattr(httpx, "get", fake_get)

assert runner._resolve_snapshot_id("snap", "eu") == "id-1"
assert runner._resolve_snapshot_id("snap", "eu") == "id-1"
assert call_count["n"] == 1


def test_resolve_snapshot_id_caches_per_target(runner, monkeypatch):
responses_by_target = {
"eu": [{"id": "eu-id", "name": "snap", "state": "active", "regionIds": ["eu"]}],
"us": [{"id": "us-id", "name": "snap", "state": "active", "regionIds": ["us"]}],
}
seen_targets = []

def fake_get(url, params=None, headers=None, timeout=None):
# The endpoint isn't target-scoped; we filter client-side.
# Return the union so per-target filtering is exercised.
seen_targets.append(params)
return _make_response(responses_by_target["eu"] + responses_by_target["us"])

monkeypatch.setattr(httpx, "get", fake_get)

assert runner._resolve_snapshot_id("snap", "eu") == "eu-id"
assert runner._resolve_snapshot_id("snap", "us") == "us-id"
# Two distinct cache keys → two HTTP calls.
assert len(seen_targets) == 2


def test_resolve_snapshot_id_skips_inactive_snapshots(runner, monkeypatch):
def fake_get(url, params=None, headers=None, timeout=None):
return _make_response(
[
{
"id": "id-1",
"name": "snap",
"state": "building",
"regionIds": ["eu"],
},
{"id": "id-2", "name": "snap", "state": "active", "regionIds": ["eu"]},
]
)

monkeypatch.setattr(httpx, "get", fake_get)

assert runner._resolve_snapshot_id("snap", "eu") == "id-2"


def test_resolve_snapshot_id_raises_when_no_match(runner, monkeypatch):
def fake_get(url, params=None, headers=None, timeout=None):
return _make_response(
[{"id": "id-1", "name": "other", "state": "active", "regionIds": ["eu"]}]
)

monkeypatch.setattr(httpx, "get", fake_get)

with pytest.raises(RuntimeError, match="No active Daytona snapshot named 'snap'"):
runner._resolve_snapshot_id("snap", "eu")
Loading