Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 282 additions & 1 deletion dlclivegui/cameras/backends/aravis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import cv2
import numpy as np

from ...config import CameraSettings
from ..base import CameraBackend, SupportLevel, register_backend
from ..factory import DetectedCamera

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,7 +42,7 @@ def __init__(self, settings):
if not isinstance(ns, dict):
ns = {}

self._camera_id: str | None = ns.get("camera_id") or props.get("camera_id")
self._camera_id: str | None = ns.get("device_id") or props.get("device_id")
self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "Mono8")
self._timeout: int = int(ns.get("timeout", props.get("timeout", 2_000_000)))
self._n_buffers: int = int(ns.get("n_buffers", props.get("n_buffers", 10)))
Expand Down Expand Up @@ -103,6 +105,153 @@ def get_device_count(cls) -> int:
except Exception:
return -1

@classmethod
def quick_ping(cls, index: int, *_args, **_kwargs) -> bool:
"""
Cheap presence test for CameraFactory probing.
Uses update_device_list() then bounds-check.
"""
if not ARAVIS_AVAILABLE:
return False
try:
Aravis.update_device_list()
n = int(Aravis.get_n_devices() or 0)
return 0 <= int(index) < n
except Exception:
return False

@classmethod
def discover_devices(cls, max_devices: int = 10, should_cancel=None, progress_cb=None):
if not ARAVIS_AVAILABLE:
return []

# Refresh list once; indices may change after update_device_list()
Aravis.update_device_list()

snap = cls._arv_snapshot_devices(limit=max_devices)

cams: list[DetectedCamera] = []
for d in snap:
if should_cancel and should_cancel():
break
if progress_cb:
progress_cb(f"Found {d['label']}")

path = d.get("physical_id") or d.get("address")

cams.append(
DetectedCamera(
index=int(d["index"]),
label=str(d["label"]),
device_id=d.get("device_id"),
path=path,
)
)
return cams

@classmethod
def rebind_settings(cls, settings: CameraSettings) -> CameraSettings:
"""
Best-effort quick rebind using only Aravis enumeration APIs (no camera open).
Indices may change after Aravis.update_device_list().
"""
if not ARAVIS_AVAILABLE:
return settings

props = settings.properties if isinstance(settings.properties, dict) else {}
ns = props.get(cls.OPTIONS_KEY, {}) if isinstance(props.get(cls.OPTIONS_KEY), dict) else {}

# Stored identifiers (some may be missing)
stored_device_id = cls._safe_str(
ns.get("device_id") or props.get("device_id") or ns.get("camera_id") or props.get("camera_id")
)
stored_physical = cls._safe_str(
ns.get("device_physical_id") or ns.get("device_path") or props.get("device_path")
)
stored_vendor = cls._safe_str(ns.get("device_vendor"))
stored_model = cls._safe_str(ns.get("device_model"))
stored_serial = cls._safe_str(ns.get("device_serial_nbr") or ns.get("device_serial"))
stored_name = cls._safe_str(ns.get("device_name"))

# Nothing to rebind with
if not any(
[stored_device_id, stored_physical, (stored_vendor and stored_model and stored_serial), stored_name]
):
return settings

try:
Aravis.update_device_list() # must be called before get_device_*
snap = cls._arv_snapshot_devices(limit=None)

# 1) device_id exact match (fast)
chosen = None
if stored_device_id:
for d in snap:
if d.get("device_id") == stored_device_id:
chosen = d
break

# 2) physical_id exact match
if chosen is None and stored_physical:
for d in snap:
if d.get("physical_id") == stored_physical or d.get("address") == stored_physical:
chosen = d
break

# 3) vendor/model/serial exact triple match
if chosen is None and stored_vendor and stored_model and stored_serial:
for d in snap:
if (d.get("vendor"), d.get("model"), d.get("serial")) == (
stored_vendor,
stored_model,
stored_serial,
):
chosen = d
break

# 4) name substring match against computed label
if chosen is None and stored_name:
needle = stored_name.lower()
for d in snap:
label = (d.get("label") or "").lower()
if needle and needle in label:
chosen = d
break

# 5) fallback to current index if still plausible
if chosen is None:
idx = int(getattr(settings, "index", 0) or 0)
if 0 <= idx < len(snap):
chosen = snap[idx]
else:
return settings

# Apply new index
settings.index = int(chosen["index"])

# Refresh namespace fields (keeps GUI stable identity fresh)
if isinstance(settings.properties, dict):
out = settings.properties.setdefault(cls.OPTIONS_KEY, {})
if isinstance(out, dict):
out["device_id"] = chosen.get("device_id")
out["device_physical_id"] = chosen.get("physical_id")
out["device_vendor"] = chosen.get("vendor")
out["device_model"] = chosen.get("model")
out["device_serial_nbr"] = chosen.get("serial")
out["device_protocol"] = chosen.get("protocol")
out["device_address"] = chosen.get("address")
out["device_name"] = chosen.get("label") # computed label (no open)

# also keep 'device_path' aligned with physical id for GUI fallback
if chosen.get("physical_id"):
out["device_path"] = chosen.get("physical_id")

return settings

except Exception:
# Never hard-fail creation just because rebinding couldn't happen
return settings

def open(self) -> None:
if not ARAVIS_AVAILABLE:
raise RuntimeError("Aravis library not available")
Expand All @@ -120,11 +269,68 @@ def open(self) -> None:
raise RuntimeError(f"Camera index {index} out of range for {n_devices} Aravis device(s)")
camera_id = Aravis.get_device_id(index)
self._camera = Aravis.Camera.new(camera_id)
self._camera_id = self._safe_str(camera_id)

if self._camera is None:
raise RuntimeError("Failed to open Aravis camera")

# --- Refresh identity and align index (best-effort, no heavy open needed) ---
try:
snap = self._arv_snapshot_devices(limit=None)

opened_id = self._camera_id
if opened_id is None:
# Opened by index
try:
opened_id = self._safe_str(Aravis.get_device_id(int(self.settings.index)))
except Exception:
opened_id = None

chosen = None
if opened_id:
for d in snap:
if d.get("device_id") == opened_id:
chosen = d
break

# If we found it, align settings.index and refresh identity cache
if chosen:
self.settings.index = int(chosen["index"])
if isinstance(self.settings.properties, dict):
ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {})
if isinstance(ns, dict):
ns["device_id"] = chosen.get("device_id")
ns["device_physical_id"] = chosen.get("physical_id")
ns["device_vendor"] = chosen.get("vendor")
ns["device_model"] = chosen.get("model")
ns["device_serial_nbr"] = chosen.get("serial")
ns["device_protocol"] = chosen.get("protocol")
ns["device_address"] = chosen.get("address")
ns["device_path"] = chosen.get("physical_id") or chosen.get("address")
else:
if isinstance(self.settings.properties, dict):
ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {})
if isinstance(ns, dict):
ns["device_id"] = opened_id
except Exception:
pass

# Compute higher-quality label from the opened camera object
self._device_label = self._resolve_device_label()
# Always populate minimal identity into backend namespace for GUI
if isinstance(self.settings.properties, dict):
ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {})
if isinstance(ns, dict):
# Always write a device_id after a successful open
try:
if self._camera_id:
ns["device_id"] = self._camera_id
else:
ns["device_id"] = self._safe_str(Aravis.get_device_id(int(self.settings.index)))
except Exception:
pass
if self._device_label:
ns["device_name"] = self._device_label

self._configure_pixel_format()
self._configure_resolution()
Expand Down Expand Up @@ -261,6 +467,81 @@ def device_name(self) -> str:
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
@staticmethod
def _safe_str(x) -> str | None:
try:
if x is None:
return None
s = str(x).strip()
return s if s else None
except Exception:
return None

@classmethod
def _arv_snapshot_devices(cls, limit: int | None = None) -> list[dict]:
"""
Fast snapshot of the current Aravis device list without opening cameras.
Requires Aravis.update_device_list() before calling.
"""
n = int(Aravis.get_n_devices() or 0) # valid until next update_device_list()
if limit is not None:
n = min(n, int(limit))

devices: list[dict] = []
for i in range(n):
try:
dev_id = cls._safe_str(Aravis.get_device_id(i))
except Exception:
dev_id = None

try:
physical = cls._safe_str(Aravis.get_device_physical_id(i))
except Exception:
physical = None
try:
vendor = cls._safe_str(Aravis.get_device_vendor(i))
except Exception:
vendor = None
try:
model = cls._safe_str(Aravis.get_device_model(i))
except Exception:
model = None
try:
serial = cls._safe_str(Aravis.get_device_serial_nbr(i))
except Exception:
serial = None
try:
protocol = cls._safe_str(Aravis.get_device_protocol(i))
except Exception:
protocol = None
try:
address = cls._safe_str(Aravis.get_device_address(i))
except Exception:
address = None

# Construct a stable-ish human label without opening the camera
label_parts = [p for p in (vendor, model) if p]
label = " ".join(label_parts) if label_parts else None
if serial:
label = f"{label} ({serial})" if label else f"({serial})"
if not label:
label = dev_id or f"Aravis #{i}"

devices.append(
{
"index": int(i),
"device_id": dev_id,
"physical_id": physical,
"vendor": vendor,
"model": model,
"serial": serial,
"protocol": protocol,
"address": address,
"label": label,
}
)
return devices

def _get_requested_resolution_or_none(self) -> tuple[int, int] | None:
"""
Return (w, h) if user explicitly requested a resolution.
Expand Down
Loading