From 41bd18b59dcbdc536f4b0f6e8de22a022d089d84 Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Thu, 19 Feb 2026 16:23:36 -0300 Subject: [PATCH 1/6] feat(scxml): add parsing and runtime support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parse SCXML elements and wire them through the existing Python-level invoke infrastructure (IInvoke protocol, InvokeManager). Key changes: - New SCXMLInvoker handler (io/scxml/invoke.py) implementing IInvoke - Parser support for , , , , namelist - Processor wiring: InvokeDefinition → SCXMLInvoker → State(invoke=) - Engine hooks: handle_external_event for finalize/autoforward routing - Fix async processing loop early exit resolving caller_future - Fix finalize running even after invocation terminates (race condition) - 19 W3C invoke tests now passing (removed from xfail sets) --- statemachine/engines/async_.py | 7 +- statemachine/engines/sync.py | 17 +-- statemachine/invoke.py | 74 ++++++++-- statemachine/io/scxml/actions.py | 93 +++++++++++-- statemachine/io/scxml/invoke.py | 210 +++++++++++++++++++++++++++++ statemachine/io/scxml/parser.py | 96 ++++++++++++- statemachine/io/scxml/processor.py | 13 +- statemachine/io/scxml/schema.py | 19 ++- statemachine/statemachine.py | 2 + tests/scxml/conftest.py | 31 +---- tests/scxml/test_scxml_cases.py | 22 +++ tests/test_scxml_units.py | 10 +- 12 files changed, 519 insertions(+), 75 deletions(-) create mode 100644 statemachine/io/scxml/invoke.py diff --git a/statemachine/engines/async_.py b/statemachine/engines/async_.py index 7164d0ba..08cea579 100644 --- a/statemachine/engines/async_.py +++ b/statemachine/engines/async_.py @@ -346,7 +346,7 @@ async def processing_loop( # noqa: C901 first_result = self._sentinel try: took_events = True - while took_events: + while took_events and self.running: self.clear_cache() took_events = False macrostep_done = False @@ -406,6 +406,9 @@ async def processing_loop( # noqa: C901 ) break + # Finalize + autoforward for active invocations + self._invoke_manager.handle_external_event(external_event) + event_future = external_event.future try: enabled_transitions = await self.select_transitions(external_event) @@ -451,6 +454,8 @@ async def processing_loop( # noqa: C901 result = first_result if first_result is not self._sentinel else None # If the caller has a future, await it (already resolved by now). if caller_future is not None: + # Resolve the future if it wasn't processed (e.g. machine terminated). + self._resolve_future(caller_future, result) return await caller_future return result diff --git a/statemachine/engines/sync.py b/statemachine/engines/sync.py index fa5b9c86..787a2469 100644 --- a/statemachine/engines/sync.py +++ b/statemachine/engines/sync.py @@ -81,7 +81,7 @@ def processing_loop(self, caller_future=None): # noqa: C901 first_result = self._sentinel try: took_events = True - while took_events: + while took_events and self.running: self.clear_cache() took_events = False # Execute the triggers in the queue in FIFO order until the queue is empty @@ -136,18 +136,9 @@ def processing_loop(self, caller_future=None): # noqa: C901 break logger.debug("External event: %s", external_event.event) - # # TODO: Handle cancel event - # if self.is_cancel_event(external_event): - # self.running = False - # return - - # TODO: Invoke states - # for state in self.configuration: - # for inv in state.invoke: - # if inv.invokeid == external_event.invokeid: - # self.apply_finalize(inv, external_event) - # if inv.autoforward: - # self.send(inv.id, external_event) + + # Finalize + autoforward for active invocations + self._invoke_manager.handle_external_event(external_event) enabled_transitions = self.select_transitions(external_event) logger.debug("Enabled transitions: %s", enabled_transitions) diff --git a/statemachine/invoke.py b/statemachine/invoke.py index 3ac34fb8..b66632f7 100644 --- a/statemachine/invoke.py +++ b/statemachine/invoke.py @@ -290,7 +290,7 @@ def cancel_for_state(self, state: "State"): for inv_id, inv in list(self._active.items()): if inv.state_id == state.id and not inv.terminated: self._cancel(inv_id) - self._pending = [(s, kw) for s, kw in self._pending if s is not state] + self._pending = [(s, kw) for s, kw in self._pending if s.id != state.id] def cancel_all(self): """Cancel all active invocations.""" @@ -314,12 +314,13 @@ def spawn_pending_sync(self): def _spawn_one_sync(self, callback: "CallbackWrapper", **kwargs): state: "State" = kwargs["state"] event_kwargs: dict = kwargs.get("event_kwargs", {}) - ctx = self._make_context(state, event_kwargs) - invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) # Use meta.func to find the original (unwrapped) handler; the callback # system wraps everything in a signature_adapter closure. handler = self._resolve_handler(callback.meta.func) + ctx = self._make_context(state, event_kwargs, handler=handler) + invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) + invocation._handler = handler self._active[ctx.invokeid] = invocation @@ -347,11 +348,10 @@ def _run_sync_handler( self.sm.send( f"done.invoke.{ctx.invokeid}", data=result, - internal=True, ) except Exception as e: if not ctx.cancelled.is_set(): - self.sm.send("error.execution", error=e, internal=True) + self.sm.send("error.execution", error=e) finally: invocation.terminated = True @@ -372,10 +372,11 @@ async def spawn_pending_async(self): def _spawn_one_async(self, callback: "CallbackWrapper", **kwargs): state: "State" = kwargs["state"] event_kwargs: dict = kwargs.get("event_kwargs", {}) - ctx = self._make_context(state, event_kwargs) - invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) handler = self._resolve_handler(callback.meta.func) + ctx = self._make_context(state, event_kwargs, handler=handler) + invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) + invocation._handler = handler self._active[ctx.invokeid] = invocation @@ -404,7 +405,6 @@ async def _run_async_handler( self.sm.send( f"done.invoke.{ctx.invokeid}", data=result, - internal=True, ) except asyncio.CancelledError: # Intentionally swallowed: the owning state was exited, so this @@ -412,7 +412,7 @@ async def _run_async_handler( return except Exception as e: if not ctx.cancelled.is_set(): - self.sm.send("error.execution", error=e, internal=True) + self.sm.send("error.execution", error=e) finally: invocation.terminated = True @@ -434,8 +434,55 @@ def _cancel(self, invokeid: str): # --- Helpers --- - def _make_context(self, state: "State", event_kwargs: "dict | None" = None) -> InvokeContext: - invokeid = f"{state.id}.{uuid.uuid4().hex[:8]}" + def handle_external_event(self, trigger_data) -> None: + """Run finalize blocks and autoforward for active invocations. + + Called by the engine before processing each external event. + For each active invocation whose handler has ``on_finalize`` or + ``on_event`` (autoforward), delegate accordingly. + """ + event_name = str(trigger_data.event) if trigger_data.event else None + if event_name is None: + return + + # Tag done.invoke events with the invokeid + if event_name.startswith("done.invoke."): + invokeid = event_name[len("done.invoke.") :] + trigger_data.kwargs.setdefault("_invokeid", invokeid) + + for inv in list(self._active.values()): + handler = inv._handler + if handler is None: + continue + + # Check if event originates from this invocation + is_from_child = trigger_data.kwargs.get( + "_invokeid" + ) == inv.invokeid or event_name.startswith(f"done.invoke.{inv.invokeid}") + + # Finalize: run the finalize block if the event came from this invocation. + # Note: finalize must run even after the invocation terminates, because + # child events may still be queued when the handler thread completes. + if is_from_child and hasattr(handler, "on_finalize"): + handler.on_finalize(trigger_data) + + # Autoforward: forward parent events to child (not events from child itself). + # Only forward if the invocation is still running. + if ( + not inv.terminated + and not is_from_child + and hasattr(handler, "autoforward") + and handler.autoforward + and hasattr(handler, "on_event") + ): + handler.on_event(event_name, **trigger_data.kwargs) + + def _make_context( + self, state: "State", event_kwargs: "dict | None" = None, handler: Any = None + ) -> InvokeContext: + # Use static invoke_id from handler if available (SCXML id= attribute) + static_id = getattr(handler, "invoke_id", None) if handler else None + invokeid = static_id or f"{state.id}.{uuid.uuid4().hex[:8]}" return InvokeContext( invokeid=invokeid, state_id=state.id, @@ -453,6 +500,11 @@ def _resolve_handler(underlying: Any) -> "Any | None": inner = underlying._invoke_handler if isinstance(inner, type) and issubclass(inner, StateChart): return StateChartInvoker(inner) + # Return the inner handler directly if it's an IInvoke instance + # (e.g., SCXMLInvoker) so duck-typed attributes like invoke_id are accessible. + # Exclude classes — @runtime_checkable matches classes that define run(). + if not isinstance(inner, type) and isinstance(inner, IInvoke): + return inner return underlying if isinstance(underlying, IInvoke): return underlying diff --git a/statemachine/io/scxml/actions.py b/statemachine/io/scxml/actions.py index 23f57868..db409344 100644 --- a/statemachine/io/scxml/actions.py +++ b/statemachine/io/scxml/actions.py @@ -111,9 +111,12 @@ class EventDataWrapper: def __init__(self, event_data): self.event_data = event_data - self.sendid = event_data.trigger_data.send_id - if event_data.trigger_data.event is None or event_data.trigger_data.event.internal: - if "error.execution" == event_data.trigger_data.event: + self.trigger_data = event_data.trigger_data + td = self.trigger_data + self.sendid = td.send_id + self.invokeid = td.kwargs.get("_invokeid", "") + if td.event is None or td.event.internal: + if "error.execution" == td.event: self.type = "platform" else: self.type = "internal" @@ -121,26 +124,56 @@ def __init__(self, event_data): else: self.type = "external" + @classmethod + def from_trigger_data(cls, trigger_data): + """Create an EventDataWrapper directly from a TriggerData (no EventData needed).""" + obj = cls.__new__(cls) + obj.event_data = None + obj.sendid = trigger_data.send_id + obj.trigger_data = trigger_data + obj.invokeid = trigger_data.kwargs.get("_invokeid", "") + event = trigger_data.event + if event is None or event.internal: + if "error.execution" == event: + obj.type = "platform" + else: + obj.type = "internal" + obj.origintype = "" + else: + obj.type = "external" + return obj + def __getattr__(self, name): - return getattr(self.event_data, name) + if self.event_data is not None: + return getattr(self.event_data, name) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __eq__(self, value): "This makes SCXML test 329 pass. It assumes that the event is the same instance" return isinstance(value, EventDataWrapper) + @property + def _trigger_data(self): + if self.event_data is not None: + return self.event_data.trigger_data + return self.trigger_data + @property def name(self): - return self.event_data.event + if self.event_data is not None: + return self.event_data.event + return str(self._trigger_data.event) if self._trigger_data.event else None @property def data(self): "Property used by the SCXML namespace" - if self.trigger_data.kwargs: - return _Data(self.trigger_data.kwargs) - elif self.trigger_data.args and len(self.trigger_data.args) == 1: - return self.trigger_data.args[0] - elif self.trigger_data.args: - return self.trigger_data.args + td = self._trigger_data + if td.kwargs: + return _Data(td.kwargs) + elif td.args and len(td.args) == 1: + return td.args[0] + elif td.args: + return td.args else: return None @@ -257,7 +290,10 @@ def __init__(self, action: AssignAction): def __call__(self, *args, **kwargs): machine: StateChart = kwargs["machine"] - value = _eval(self.action.expr, **kwargs) + if self.action.child_xml is not None: + value = self.action.child_xml + else: + value = _eval(self.action.expr, **kwargs) *path, attr = self.action.location.split(".") obj = machine.model @@ -364,6 +400,26 @@ def raise_action(*args, **kwargs): return raise_action +def _send_to_parent(action: SendAction, **kwargs): + """Route a to the parent machine via _invoke_session.""" + machine = kwargs["machine"] + session = getattr(machine, "_invoke_session", None) + if session is None: + return + event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] + names = [] + for name in (action.namelist or "").strip().split(): + if not hasattr(machine.model, name): + raise NameError(f"Namelist variable '{name}' not found on model") + names.append(Param(name=name, expr=name)) + params_values = {} + for param in chain(names, action.params): + if param.expr is None: + continue + params_values[param.name] = _eval(param.expr, **kwargs) + session.send_to_parent(event, **params_values) + + def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901 content: Any = () _valid_targets = (None, "#_internal", "internal", "#_parent", "parent") @@ -373,7 +429,7 @@ def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901 except (NameError, SyntaxError, TypeError): content = (action.content,) - def send_action(*args, **kwargs): + def send_action(*args, **kwargs): # noqa: C901 machine: StateChart = kwargs["machine"] event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] target = action.target if action.target else None @@ -393,6 +449,11 @@ def send_action(*args, **kwargs): raise ValueError(f"Invalid target: {target}. Must be one of {_valid_targets}") return + # Handle #_parent target — route to parent via _invoke_session + if target == "#_parent": + _send_to_parent(action, **kwargs) + return + internal = target in ("#_internal", "internal") send_id = None @@ -464,6 +525,12 @@ def _create_dataitem_callable(action: DataItem) -> Callable: def data_initializer(**kwargs): machine: StateChart = kwargs["machine"] + # Check for invoke param overrides — params from parent override child defaults + invoke_params = getattr(machine, "_invoke_params", None) + if invoke_params and action.id in invoke_params: + setattr(machine.model, action.id, invoke_params[action.id]) + return + if action.expr: try: value = _eval(action.expr, **kwargs) diff --git a/statemachine/io/scxml/invoke.py b/statemachine/io/scxml/invoke.py new file mode 100644 index 00000000..2a5005da --- /dev/null +++ b/statemachine/io/scxml/invoke.py @@ -0,0 +1,210 @@ +"""SCXML-specific invoke handler. + +Implements the IInvoke protocol by resolving child SCXML content (inline or +via src/srcexpr), evaluating params/namelist in the parent context, and managing +the child machine lifecycle including ``#_parent`` routing, autoforward, and +finalize. +""" + +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING +from typing import Any + +from ...invoke import IInvoke +from ...invoke import InvokeContext +from .actions import ExecuteBlock +from .actions import _eval +from .schema import InvokeDefinition + +if TYPE_CHECKING: + from .processor import SCXMLProcessor + +logger = logging.getLogger(__name__) + +_VALID_INVOKE_TYPES = { + None, + "scxml", + "http://www.w3.org/TR/scxml", + "http://www.w3.org/TR/scxml/", + "http://www.w3.org/TR/scxml/#SCXMLEventProcessor", +} + + +class SCXMLInvoker: + """SCXML-specific invoke handler implementing the IInvoke protocol. + + Resolves the child SCXML from inline content, src file, or srcexpr, + evaluates params/namelist, and manages the child machine lifecycle. + """ + + def __init__( + self, + definition: InvokeDefinition, + processor: "SCXMLProcessor", + ): + self._definition = definition + self._processor = processor + self._child: Any = None + self._base_dir: str = os.getcwd() + + # Duck-typed attributes for InvokeManager + self.invoke_id: "str | None" = definition.id + self.idlocation: "str | None" = definition.idlocation + self.autoforward: bool = definition.autoforward + + # Pre-compile finalize block + self._finalize_block: "ExecuteBlock | None" = None + if definition.finalize and not definition.finalize.is_empty: + self._finalize_block = ExecuteBlock(definition.finalize) + + def run(self, ctx: InvokeContext) -> Any: + """Create and run the child state machine.""" + machine = ctx.machine + + # Store invokeid in idlocation if specified + if self.idlocation: + setattr(machine.model, self.idlocation, ctx.invokeid) + + # Resolve invoke type + invoke_type = self._definition.type + if self._definition.typeexpr: + invoke_type = _eval(self._definition.typeexpr, machine=machine) + + if invoke_type not in _VALID_INVOKE_TYPES: + raise ValueError( + f"Unsupported invoke type: {invoke_type}. Supported types: {_VALID_INVOKE_TYPES}" + ) + + # Resolve child SCXML content + scxml_content = self._resolve_content(machine) + if scxml_content is None: + raise ValueError("No content resolved for ") + + # Evaluate params and namelist + invoke_params = self._evaluate_params(machine) + + # Parse and create the child machine + child_cls = self._create_child_class(scxml_content, ctx.invokeid) + + # Create child machine with param overrides and parent session reference. + # _invoke_session must be passed as a kwarg so it's available during + # the constructor (the child SM runs in __init__). + session = _InvokeSession(parent=machine, invokeid=ctx.invokeid) + self._child = child_cls( + _invoke_params=invoke_params, + _invoke_session=session, + ) + + # Wait for child to reach final state (it already ran in constructor) + # The child sends events to parent via #_parent routing. + return None + + def on_cancel(self): + """Cancel the child machine.""" + self._child = None + + def on_event(self, event_name: str, **data): + """Forward an event to the child machine (autoforward).""" + if self._child is not None and not self._child.is_terminated: + try: + self._child.send(event_name, **data) + except Exception: + logger.debug("Error forwarding event %s to child", event_name, exc_info=True) + + def on_finalize(self, trigger_data): + """Execute the finalize block before the parent processes the event.""" + if self._finalize_block is not None: + machine = trigger_data.machine + kwargs = { + "machine": machine, + "model": machine.model, + } + # Inject SCXML context variables + from .actions import EventDataWrapper + + kwargs.update( + {k: v for k, v in machine.model.__dict__.items() if not k.startswith("_")} + ) + # Build EventDataWrapper from trigger_data's kwargs + kwargs["_event"] = EventDataWrapper.from_trigger_data(trigger_data) + self._finalize_block(**kwargs) + + def _resolve_content(self, machine) -> "str | None": + """Resolve the child SCXML content from content/src/srcexpr.""" + defn = self._definition + + if defn.content: + # Content could be an expr to evaluate or inline SCXML + if defn.content.lstrip().startswith("<"): + return defn.content + # It's an expression — evaluate it + result = _eval(defn.content, machine=machine) + if isinstance(result, str): + return result + return str(result) + + if defn.srcexpr: + src = _eval(defn.srcexpr, machine=machine) + elif defn.src: + src = defn.src + else: + return None + + # Handle file: URIs and relative paths + if src.startswith("file:"): + path = Path(src.removeprefix("file:")) + else: + path = Path(src) + + # Resolve relative to the base directory of the parent SCXML file + if not path.is_absolute(): + path = Path(self._base_dir) / path + + return path.read_text() + + def _evaluate_params(self, machine) -> dict: + """Evaluate params and namelist into a dict of values.""" + defn = self._definition + result = {} + + # Evaluate namelist + if defn.namelist: + for name in defn.namelist.strip().split(): + if hasattr(machine.model, name): + result[name] = getattr(machine.model, name) + + # Evaluate param elements + for param in defn.params: + if param.expr is not None: + result[param.name] = _eval(param.expr, machine=machine) + elif param.location is not None: + result[param.name] = _eval(param.location, machine=machine) + + return result + + def _create_child_class(self, scxml_content: str, invokeid: str): + """Parse the child SCXML and create a machine class.""" + from .parser import parse_scxml + + child_name = f"invoke_{invokeid}" + definition = parse_scxml(scxml_content) + self._processor.process_definition(definition, location=child_name) + return self._processor.scs[child_name] + + +class _InvokeSession: + """Holds the reference to the parent machine for ``#_parent`` routing.""" + + def __init__(self, parent, invokeid: str): + self.parent = parent + self.invokeid = invokeid + + def send_to_parent(self, event: str, **data): + """Send an event to the parent machine's external queue.""" + self.parent.send(event, _invokeid=self.invokeid, **data) + + +# Verify protocol compliance at import time +assert isinstance(SCXMLInvoker.__new__(SCXMLInvoker), IInvoke) diff --git a/statemachine/io/scxml/parser.py b/statemachine/io/scxml/parser.py index 6c42208f..5fb482e0 100644 --- a/statemachine/io/scxml/parser.py +++ b/statemachine/io/scxml/parser.py @@ -15,6 +15,7 @@ from .schema import HistoryState from .schema import IfAction from .schema import IfBranch +from .schema import InvokeDefinition from .schema import LogAction from .schema import Param from .schema import RaiseAction @@ -84,10 +85,30 @@ def parse_scxml(scxml_content: str) -> StateMachineDefinition: # noqa: C901 return definition +def _find_own_datamodel_elements(root: ET.Element) -> List[ET.Element]: + """Find elements that belong to this SCXML document, not to inline children. + + Skips any nested inside elements (which contain inline + child SCXML documents for ). + """ + result: List[ET.Element] = [] + + def _walk(elem: ET.Element): + for child in elem: + if child.tag == "content": + continue # Skip inline SCXML content + if child.tag == "datamodel": + result.append(child) + _walk(child) + + _walk(root) + return result + + def parse_datamodel(root: ET.Element) -> "DataModel | None": data_model = DataModel() - for datamodel_elem in root.findall(".//datamodel"): + for datamodel_elem in _find_own_datamodel_elements(root): for data_elem in datamodel_elem.findall("data"): content = data_elem.text and re.sub(r"\s+", " ", data_elem.text).strip() or None src = data_elem.attrib.get("src") @@ -139,7 +160,10 @@ def parse_state( # noqa: C901 ) -> State: state_id = state_elem.get("id") if not state_id: - raise ValueError("State must have an 'id' attribute") + # Per SCXML spec, if no id is specified, the processor auto-generates one. + from uuid import uuid4 + + state_id = f"__auto_{uuid4().hex[:8]}" initial = state_id in initial_states state = State(id=state_id, initial=initial, final=is_final, parallel=is_parallel) @@ -192,6 +216,10 @@ def parse_state( # noqa: C901 child_history_state = parse_history(child_state_elem) state.history[child_history_state.id] = child_history_state + # Parse invoke elements + for invoke_elem in state_elem.findall("invoke"): + state.invocations.append(parse_invoke(invoke_elem)) + # Parse donedata (only valid on final states) if is_final: donedata_elem = state_elem.find("donedata") @@ -276,8 +304,16 @@ def parse_raise(element: ET.Element) -> RaiseAction: def parse_assign(element: ET.Element) -> AssignAction: location = element.attrib["location"] - expr = element.attrib["expr"] - return AssignAction(location=location, expr=expr) + expr = element.attrib.get("expr") + child_xml: "str | None" = None + if expr is None: + # Per SCXML spec, can have child content instead of expr + children = list(element) + if children: + child_xml = ET.tostring(children[0], encoding="unicode") + elif element.text: + expr = element.text.strip() + return AssignAction(location=location, expr=expr, child_xml=child_xml) def parse_log(element: ET.Element) -> LogAction: @@ -382,3 +418,55 @@ def parse_cancel(element: ET.Element) -> CancelAction: def parse_script(element: ET.Element) -> ScriptAction: content = element.text.strip() if element.text else "" return ScriptAction(content=content) + + +def parse_invoke(element: ET.Element) -> InvokeDefinition: + """Parse an element into an InvokeDefinition.""" + invoke_type = element.attrib.get("type") + typeexpr = element.attrib.get("typeexpr") + src = element.attrib.get("src") + srcexpr = element.attrib.get("srcexpr") + invoke_id = element.attrib.get("id") + idlocation = element.attrib.get("idlocation") + autoforward = element.attrib.get("autoforward", "false").lower() == "true" + namelist = element.attrib.get("namelist") + + params: List[Param] = [] + content: "str | None" = None + finalize: "ExecutableContent | None" = None + + for child in element: + if child.tag == "param": + name = child.attrib["name"] + expr = child.attrib.get("expr") + location = child.attrib.get("location") + params.append(Param(name=name, expr=expr, location=location)) + elif child.tag == "content": + # Check for inline element + scxml_child = child.find("{http://www.w3.org/2005/07/scxml}scxml") + if scxml_child is None: + scxml_child = child.find("scxml") + if scxml_child is not None: + # Serialize the inline SCXML back to string for later parsing + content = ET.tostring(scxml_child, encoding="unicode") + elif child.attrib.get("expr"): + # Dynamic content via expr attribute + content = child.attrib["expr"] + elif child.text: + content = re.sub(r"\s+", " ", child.text).strip() + elif child.tag == "finalize": + finalize = parse_executable_content(child) + + return InvokeDefinition( + type=invoke_type, + typeexpr=typeexpr, + src=src, + srcexpr=srcexpr, + id=invoke_id, + idlocation=idlocation, + autoforward=autoforward, + namelist=namelist, + params=params, + content=content, + finalize=finalize, + ) diff --git a/statemachine/io/scxml/processor.py b/statemachine/io/scxml/processor.py index fb0d6e82..38f3856d 100644 --- a/statemachine/io/scxml/processor.py +++ b/statemachine/io/scxml/processor.py @@ -19,8 +19,10 @@ from .actions import EventDataWrapper from .actions import ExecuteBlock from .actions import create_datamodel_action_callable +from .invoke import SCXMLInvoker from .parser import parse_scxml from .schema import HistoryState +from .schema import InvokeDefinition from .schema import State from .schema import Transition @@ -157,7 +159,7 @@ def _process_states(self, states: Dict[str, State]) -> Dict[str, StateDefinition states_dict[state_id] = self._process_state(state) return states_dict - def _process_state(self, state: State) -> StateDefinition: + def _process_state(self, state: State) -> StateDefinition: # noqa: C901 state_dict = StateDefinition() if state.initial: state_dict["initial"] = True @@ -184,6 +186,11 @@ def _process_state(self, state: State) -> StateDefinition: if state.transitions: state_dict["transitions"] = self._process_transitions(state.transitions) + # Process invoke elements + if state.invocations: + invokers = [self._process_invocation(inv) for inv in state.invocations] + state_dict["invoke"] = invokers # type: ignore[typeddict-unknown-key] + if state.states: state_dict["states"] = self._process_states(state.states) @@ -192,6 +199,10 @@ def _process_state(self, state: State) -> StateDefinition: return state_dict + def _process_invocation(self, invoke_def: InvokeDefinition) -> SCXMLInvoker: + """Convert an InvokeDefinition into an SCXMLInvoker.""" + return SCXMLInvoker(definition=invoke_def, processor=self) + def _process_transitions(self, transitions: List[Transition]): result: TransitionsList = [] for transition in transitions: diff --git a/statemachine/io/scxml/schema.py b/statemachine/io/scxml/schema.py index 1ec40018..0b1ec9ca 100644 --- a/statemachine/io/scxml/schema.py +++ b/statemachine/io/scxml/schema.py @@ -31,7 +31,8 @@ class RaiseAction(Action): @dataclass class AssignAction(Action): location: str - expr: str + expr: "str | None" = None + child_xml: "str | None" = None @dataclass @@ -114,6 +115,21 @@ class DoneData: content_expr: "str | None" = None +@dataclass +class InvokeDefinition: + type: "str | None" = None + typeexpr: "str | None" = None + src: "str | None" = None + srcexpr: "str | None" = None + id: "str | None" = None + idlocation: "str | None" = None + autoforward: bool = False + namelist: "str | None" = None + params: List[Param] = field(default_factory=list) + content: "str | None" = None + finalize: "ExecutableContent | None" = None + + @dataclass class State: id: str @@ -126,6 +142,7 @@ class State: states: Dict[str, "State"] = field(default_factory=dict) history: Dict[str, "HistoryState"] = field(default_factory=dict) donedata: "DoneData | None" = None + invocations: List[InvokeDefinition] = field(default_factory=list) @dataclass diff --git a/statemachine/statemachine.py b/statemachine/statemachine.py index 84518b7a..ddc9f971 100644 --- a/statemachine/statemachine.py +++ b/statemachine/statemachine.py @@ -142,6 +142,8 @@ def __init__( **kwargs: Any, ): self.model: TModel = model if model is not None else Model() # type: ignore[assignment] + self._invoke_params: "dict | None" = kwargs.pop("_invoke_params", None) + self._invoke_session: Any = kwargs.pop("_invoke_session", None) self.history_values: Dict[ str, List[State] ] = {} # Mapping of compound states to last active state(s). diff --git a/tests/scxml/conftest.py b/tests/scxml/conftest.py index 3fd34ce8..3aab408d 100644 --- a/tests/scxml/conftest.py +++ b/tests/scxml/conftest.py @@ -5,39 +5,16 @@ CURRENT_DIR = Path(__file__).parent TESTCASES_DIR = CURRENT_DIR -# xfail sets — all tests currently fail identically on both engines +# xfail sets — tests that fail identically on both engines XFAIL_BOTH = { - # mandatory — invoke-related - "test191", + # mandatory — invoke-related (still failing) + "test187", "test192", - "test207", - "test215", - "test216", - "test220", - "test223", - "test224", - "test225", - "test226", - "test228", "test229", - "test232", - "test233", - "test234", - "test235", "test236", - "test239", "test240", - "test241", - "test243", - "test244", - "test245", - "test247", "test253", - "test276", - "test338", - "test347", - "test422", - "test530", + "test554", # optional "test201", "test446", diff --git a/tests/scxml/test_scxml_cases.py b/tests/scxml/test_scxml_cases.py index ccb411da..c797b89c 100644 --- a/tests/scxml/test_scxml_cases.py +++ b/tests/scxml/test_scxml_cases.py @@ -1,3 +1,4 @@ +import time from pathlib import Path import pytest @@ -57,15 +58,35 @@ def _assert_passed(sm: StateChart): assert "pass" in {s.id for s in sm.configuration} +def _wait_for_completion(sm: StateChart, timeout_s: float = 5.0): + """Poll the processing loop until the SM reaches a final state or times out.""" + deadline = time.monotonic() + timeout_s + while not sm.is_terminated and time.monotonic() < deadline: + time.sleep(0.02) + # Trigger processing loop to handle events from invoke threads + sm._engine.processing_loop() + + def test_scxml_usecase_sync(testcase_path: Path, should_generate_debug_diagram, caplog): sm = _run_scxml_testcase( testcase_path, should_generate_debug_diagram, async_mode=False, ) + _wait_for_completion(sm) _assert_passed(sm) +async def _async_wait_for_completion(sm: StateChart, timeout_s: float = 5.0): + """Poll the processing loop until the SM reaches a final state or times out.""" + import asyncio + + deadline = time.monotonic() + timeout_s + while not sm.is_terminated and time.monotonic() < deadline: + await asyncio.sleep(0.02) + await sm._engine.processing_loop() + + @pytest.mark.asyncio() async def test_scxml_usecase_async(testcase_path: Path, should_generate_debug_diagram, caplog): sm = _run_scxml_testcase( @@ -76,4 +97,5 @@ async def test_scxml_usecase_async(testcase_path: Path, should_generate_debug_di # In async context, the engine only queued __initial__ during __init__. # Activate now within the running event loop. await sm.activate_initial_state() + await _async_wait_for_completion(sm) _assert_passed(sm) diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index 66a23a74..aced9921 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -59,11 +59,13 @@ def test_no_scxml_element_raises(self): class TestParseState: - def test_state_without_id_raises(self): - """State element without id attribute raises ValueError.""" + def test_state_without_id_gets_auto_generated(self): + """State element without id attribute gets an auto-generated id.""" xml = '' - with pytest.raises(ValueError, match="State must have an 'id' attribute"): - parse_scxml(xml) + definition = parse_scxml(xml) + state_ids = list(definition.states.keys()) + assert len(state_ids) == 1 + assert state_ids[0].startswith("__auto_") class TestParseHistory: From e11eaa6c4a4bccc80b5a3595684d68842e9de982 Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Fri, 20 Feb 2026 22:57:09 -0300 Subject: [PATCH 2/6] feat(scxml): implement #_ send target and block-level error catching for transition content MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement two fixes for SCXML W3C tests 192 and 253: 1. Add #_ send target support — the SCXML convention for parent-to-child event routing. Events are forwarded through InvokeManager.send_to_child() to the child handler's on_event(). Unreachable targets queue error.communication. 2. Wrap transition `on` content with on_error so errors are caught per-block (SCXML spec §5.12.1). Previously, errors in transition actions caused full microstep rollback; now the transition completes and error.execution is queued separately. During error.execution processing, on_error is disabled for transition content to prevent infinite loops in self-transition error handlers. Also includes: - Decouple SCXMLInvoker from processor (takes base_dir + register_child) - Add invoke_init callback for invoked child machines - Thread leak detection fixture and interruptible wait fixes - Unit tests for _send_to_invoke and SCXMLInvoker - Documentation updates for invoke, error handling, and release notes --- AGENTS.md | 38 ++++- docs/invoke.md | 32 ++-- docs/processing_model.md | 6 +- docs/releases/3.0.0.md | 16 +- docs/statecharts.md | 26 ++- pyproject.toml | 2 + statemachine/engines/async_.py | 57 ++++++- statemachine/engines/base.py | 59 ++++++- statemachine/engines/sync.py | 29 +++- statemachine/invoke.py | 93 ++++++++++- statemachine/io/scxml/actions.py | 87 +++++++--- statemachine/io/scxml/invoke.py | 57 ++++--- statemachine/io/scxml/processor.py | 44 +++-- statemachine/statemachine.py | 2 - tests/conftest.py | 39 +++++ tests/scxml/conftest.py | 48 +++--- tests/test_async.py | 7 +- tests/test_error_execution.py | 42 +++-- tests/test_invoke.py | 5 +- tests/test_scxml_units.py | 254 +++++++++++++++++++++++++++++ 20 files changed, 789 insertions(+), 154 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index bb2914b3..8fbf1fba 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,13 +44,22 @@ The engine follows the SCXML run-to-completion (RTC) model with two processing l ### Error handling (`error_on_execution`) - `StateChart` has `error_on_execution=True` by default; `StateMachine` has `False`. -- Errors are caught at the **block level** (per onentry/onexit block), not per microstep. -- This means `after` callbacks still run even when an action raises — making `after_()` - a natural **finalize** hook (runs on both success and failure paths). +- Errors are caught at the **block level** (per onentry/onexit/transition `on` block), not per + microstep. This means `after` callbacks still run even when an action raises — making + `after_()` a natural **finalize** hook (runs on both success and failure paths). - `error.execution` is dispatched as an internal event; define transitions for it to handle errors within the statechart. - Error during `error.execution` handling → ignored to prevent infinite loops. +#### `on_error` asymmetry: transition `on` vs onentry/onexit + +Transition `on` content uses `on_error` **only for non-`error.execution` events**. During +`error.execution` processing, `on_error` is disabled for transition `on` content — errors +propagate to `microstep()` where `_send_error_execution` ignores them. This prevents infinite +loops in self-transition error handlers (e.g., `error_execution = s1.to(s1, on="handler")` +where `handler` raises). `onentry`/`onexit` blocks always use `on_error` regardless of the +current event. + ### Eventless transitions - Bare transition statements (not assigned to a variable) are **eventless** — they fire @@ -68,6 +77,21 @@ The engine follows the SCXML run-to-completion (RTC) model with two processing l - `on_error_execution()` works via naming convention but **only** when a transition for `error.execution` is declared — it is NOT a generic callback. +### Invoke (``) + +- `invoke.py` — `InvokeManager` on the engine manages the lifecycle: `mark_for_invoke()`, + `cancel_for_state()`, `spawn_pending_sync/async()`, `send_to_child()`. +- `_cleanup_terminated()` only removes invocations that are both terminated **and** cancelled. + A terminated-but-not-cancelled invocation means the handler's `run()` returned but the owning + state is still active — it must stay in `_active` so `send_to_child()` can still route events. +- **Child machine constructor blocks** in the processing loop. Use a listener pattern (e.g., + `_ChildRefSetter`) to capture the child reference during the first `on_enter_state`, before + the loop spins. +- `#_` send target: routed via `_send_to_invoke()` in `io/scxml/actions.py` → + `InvokeManager.send_to_child()` → handler's `on_event()`. +- **Tests with blocking threads**: use `threading.Event.wait(timeout=)` instead of + `time.sleep()` for interruptible waits — avoids thread leak errors in teardown. + ## Environment setup ```bash @@ -77,11 +101,11 @@ pre-commit install ## Running tests -Always use `uv` to run commands: +Always use `uv` to run commands. Also, use a timeout to avoid being stuck in the case of a leaked thread or infinite loop: ```bash # Run all tests (parallel) -uv run pytest -n auto +timeout 120 uv run pytest -n 4 # Run a specific test file uv run pytest tests/test_signature.py @@ -98,9 +122,11 @@ Don't specify the directory `tests/`, because this will exclude doctests from bo (`--doctest-glob=*.md`) (enabled by default): ```bash -uv run pytest -n auto +timeout 120 uv run pytest -n 4 ``` +Testes normally run under 60s (~40s on average), so take a closer look if they take longer, it can be a regression. + Coverage is enabled by default. ### Testing both sync and async engines diff --git a/docs/invoke.md b/docs/invoke.md index 62d01537..a3b162cf 100644 --- a/docs/invoke.md +++ b/docs/invoke.md @@ -456,24 +456,28 @@ is cancelled. Pass a `StateChart` subclass to spawn a child machine: -```python -from statemachine import State, StateChart +```py +>>> class ChildMachine(StateChart): +... start = State(initial=True) +... end = State(final=True) +... go = start.to(end) +... +... def on_enter_start(self, **kwargs): +... self.send("go") + +>>> class ParentMachine(StateChart): +... loading = State(initial=True, invoke=ChildMachine) +... ready = State(final=True) +... done_invoke_loading = loading.to(ready) -class ChildMachine(StateChart): - start = State(initial=True) - end = State(final=True) - go = start.to(end) +>>> sm = ParentMachine() +>>> time.sleep(0.2) - def on_enter_start(self, **kwargs): - self.send("go") +>>> "ready" in sm.configuration_values +True -class ParentMachine(StateChart): - loading = State(initial=True, invoke=ChildMachine) - ready = State(final=True) - done_invoke_loading = loading.to(ready) ``` The child machine is instantiated and run when the parent's `loading` state is entered. When the child terminates (reaches a final state), a `done.invoke` event is sent to the -parent, triggering the `done_invoke_loading` transition. See -`tests/test_invoke.py::TestInvokeStateChartChild` for a working example. +parent, triggering the `done_invoke_loading` transition. diff --git a/docs/processing_model.md b/docs/processing_model.md index f4698afe..9acd1f1a 100644 --- a/docs/processing_model.md +++ b/docs/processing_model.md @@ -111,8 +111,10 @@ and executes them atomically: If an error occurs during steps 1–4 and `error_on_execution` is enabled, the error is caught at the **block level** — meaning remaining actions in that block are skipped, but -the microstep continues and `after` callbacks still run (see -{ref}`cleanup / finalize pattern `). +the microstep continues and `after` callbacks still run. Each phase (exit, `on`, enter) +is an independent block, so an error in the transition `on` action does not prevent target +states from being entered. See {ref}`block-level error catching ` and the +{ref}`cleanup / finalize pattern `. ### Macrostep diff --git a/docs/releases/3.0.0.md b/docs/releases/3.0.0.md index 89973cd7..d315e402 100644 --- a/docs/releases/3.0.0.md +++ b/docs/releases/3.0.0.md @@ -83,6 +83,10 @@ machines can receive context at creation time: ``` +Invoke also supports child state machines (pass a `StateChart` subclass) and SCXML +`` with ``, autoforward, and `#_` / `#_parent` send targets +for parent-child communication. + See {ref}`invoke` for full documentation. ### Compound states @@ -336,6 +340,11 @@ True ``` +Errors are caught at the **block level**: each microstep phase (exit, transition `on`, +enter) is an independent block. An error in one block does not prevent subsequent blocks +from executing — in particular, `after` callbacks always run, making `after_()` a +natural finalize hook. See {ref}`block-level error catching `. + The error object is available as `error` in handler kwargs. See {ref}`error-execution` for full details. @@ -504,11 +513,8 @@ TODO. The following SCXML features are **not yet implemented** and are deferred to a future release: -- `` — invoking external services or sub-machines from within a state -- HTTP and other external communication targets -- `` — processing data returned from invoked services - -These features are tracked for v3.1+. +- HTTP and other external communication targets (only `#_internal`, `#_parent`, and + `#_` send targets are supported) ```{seealso} For a step-by-step migration guide with before/after examples, see diff --git a/docs/statecharts.md b/docs/statecharts.md index b95d5420..e254d345 100644 --- a/docs/statecharts.md +++ b/docs/statecharts.md @@ -213,12 +213,36 @@ If an error occurs while processing the `error.execution` event itself, the engi ignores the second error (logging a warning) to prevent infinite loops. The state machine remains in the configuration it was in before the failed error handler. +### Block-level error catching + +`StateChart` catches errors at the **block level**, not the microstep level. +Each phase of the microstep — `on_exit`, transition `on` content, `on_enter` — is an +independent block. An error in one block: + +- **Stops remaining actions in that block** (per SCXML spec, execution MUST NOT continue + within the same block after an error). +- **Does not affect other blocks** — subsequent phases of the microstep still execute. + In particular, `after` callbacks always run regardless of errors in earlier blocks. + +This means that even if a transition's `on` action raises an exception, the transition +completes: target states are entered and `after_()` callbacks still run. The error +is caught and queued as an `error.execution` internal event, which can be handled by a +separate transition. + +```{note} +During `error.execution` processing, errors in transition `on` content are **not** caught +at block level — they propagate to the microstep, where they are silently ignored. This +prevents infinite loops when an error handler's own action raises (e.g., a self-transition +`error_execution = s1.to(s1, on="handler")` where `handler` raises). Entry/exit blocks +always use block-level error catching regardless of the current event. +``` + ### Cleanup / finalize pattern A common need is to run cleanup code after a transition **regardless of success or failure** — for example, releasing a lock or closing a resource. -Because `StateChart` catches errors at the **block level** (not the microstep level), +Because `StateChart` catches errors at the **block level** (see above), `after_()` callbacks still run even when an action raises an exception. This makes `after_()` a natural **finalize** hook — no need to duplicate cleanup logic in an error handler. diff --git a/pyproject.toml b/pyproject.toml index 7e30186d..39b24f15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,8 @@ python_files = ["tests.py", "test_*.py", "*_tests.py"] xfail_strict = true log_cli = true log_cli_level = "DEBUG" +log_cli_format = "%(relativeCreated)6.0fms %(threadName)-18s %(name)-35s %(message)s" +log_cli_date_format = "%H:%M:%S" asyncio_default_fixture_loop_scope = "module" [tool.coverage.run] diff --git a/statemachine/engines/async_.py b/statemachine/engines/async_.py index 08cea579..836dca5b 100644 --- a/statemachine/engines/async_.py +++ b/statemachine/engines/async_.py @@ -13,6 +13,7 @@ from ..exceptions import TransitionNotAllowed from ..orderedset import OrderedSet from ..state import State +from .base import _ERROR_EXECUTION from .base import BaseEngine if TYPE_CHECKING: @@ -178,6 +179,7 @@ async def _exit_states( # type: ignore[override] args, kwargs = await self._get_args_kwargs(info.transition, trigger_data) if info.state is not None: # pragma: no branch + logger.debug("%s Exiting state: %s", self._log_id, info.state) await self.sm._callbacks.async_call( info.state.exit.key, *args, on_error=on_error, **kwargs ) @@ -198,10 +200,24 @@ async def _enter_states( # noqa: C901 self._prepare_entry_states(enabled_transitions, states_to_exit, previous_configuration) ) + # For transition 'on' content, use on_error only for non-error.execution + # events. During error.execution processing, errors in transition content + # must propagate to microstep() where _send_error_execution's guard + # prevents infinite loops (per SCXML spec: errors during error event + # processing are ignored). + on_error_transition = on_error + if ( + on_error is not None + and trigger_data.event + and str(trigger_data.event) == _ERROR_EXECUTION + ): + on_error_transition = None + result = await self._execute_transition_content( enabled_transitions, trigger_data, lambda t: t.on.key, + on_error=on_error_transition, previous_configuration=previous_configuration, new_configuration=new_configuration, ) @@ -218,7 +234,7 @@ async def _enter_states( # noqa: C901 target=target, ) - logger.debug("Entering state: %s", target) + logger.debug("%s Entering state: %s", self._log_id, target) self._add_state_to_configuration(target) on_entry_result = await self.sm._callbacks.async_call( @@ -257,6 +273,14 @@ async def _enter_states( # noqa: C901 return result async def microstep(self, transitions: "List[Transition]", trigger_data: TriggerData): + self._microstep_count += 1 + logger.debug( + "%s macro:%d micro:%d transitions: %s", + self._log_id, + self._macrostep_count, + self._microstep_count, + transitions, + ) previous_configuration = self.sm.configuration try: result = await self._execute_transition_content( @@ -342,7 +366,7 @@ async def processing_loop( # noqa: C901 return None _ctx_token = _in_processing_loop.set(True) - logger.debug("Processing loop started: %s", self.sm.current_state_value) + logger.debug("%s Processing loop started: %s", self._log_id, self.sm.current_state_value) first_result = self._sentinel try: took_events = True @@ -353,7 +377,12 @@ async def processing_loop( # noqa: C901 # Phase 1: eventless transitions and internal events while not macrostep_done: - logger.debug("Macrostep: eventless/internal queue") + self._microstep_count = 0 + logger.debug( + "%s Macrostep %d: eventless/internal queue", + self._log_id, + self._macrostep_count, + ) self.clear_cache() internal_event = TriggerData(self.sm, event=None) # null object for eventless @@ -365,7 +394,9 @@ async def processing_loop( # noqa: C901 internal_event = self.internal_queue.pop() enabled_transitions = await self.select_transitions(internal_event) if enabled_transitions: - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) took_events = True await self._run_microstep(enabled_transitions, internal_event) @@ -380,7 +411,9 @@ async def processing_loop( # noqa: C901 await self._run_microstep(enabled_transitions, internal_event) # Phase 3: external events - logger.debug("Macrostep: external queue") + logger.debug( + "%s Macrostep %d: external queue", self._log_id, self._macrostep_count + ) while not self.external_queue.is_empty(): self.clear_cache() took_events = True @@ -393,7 +426,14 @@ async def processing_loop( # noqa: C901 # transitions can be processed while we wait. break - logger.debug("External event: %s", external_event.event) + self._macrostep_count += 1 + self._microstep_count = 0 + logger.debug( + "%s macrostep %d: event=%s", + self._log_id, + self._macrostep_count, + external_event.event, + ) # Handle lazy initial state activation. # Break out of phase 3 so the outer loop restarts from phase 1 @@ -412,7 +452,9 @@ async def processing_loop( # noqa: C901 event_future = external_event.future try: enabled_transitions = await self.select_transitions(external_event) - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) if enabled_transitions: result = await self.microstep( list(enabled_transitions), external_event @@ -451,6 +493,7 @@ async def processing_loop( # noqa: C901 _in_processing_loop.reset(_ctx_token) self._processing.release() + logger.debug("%s Processing loop ended", self._log_id) result = first_result if first_result is not self._sentinel else None # If the caller has a future, await it (already resolved by now). if caller_future is not None: diff --git a/statemachine/engines/base.py b/statemachine/engines/base.py index f1c341d2..0186926b 100644 --- a/statemachine/engines/base.py +++ b/statemachine/engines/base.py @@ -96,6 +96,9 @@ def __init__(self, sm: "StateChart"): self._processing = Lock() self._cache: Dict = {} # Cache for _get_args_kwargs results self._invoke_manager = InvokeManager(self) + self._macrostep_count: int = 0 + self._microstep_count: int = 0 + self._log_id = f"[{type(sm).__name__}]" def empty(self): # pragma: no cover return self.external_queue.is_empty() @@ -122,7 +125,8 @@ def put(self, trigger_data: TriggerData, internal: bool = False, _delayed: bool if not _delayed: logger.debug( - "New event '%s' put on the '%s' queue", + "%s New event '%s' put on the '%s' queue", + self._log_id, trigger_data.event, "internal" if internal else "external", ) @@ -175,7 +179,12 @@ def _send_error_execution(self, error: Exception, trigger_data: TriggerData): If already processing an error.execution event, ignore to avoid infinite loops. """ - logger.debug("Error %s captured while executing event=%s", error, trigger_data.event) + logger.debug( + "%s Error %s captured while executing event=%s", + self._log_id, + error, + trigger_data.event, + ) if trigger_data.event and str(trigger_data.event) == _ERROR_EXECUTION: logger.warning("Error while processing error.execution, ignoring: %s", error) return @@ -371,6 +380,14 @@ def microstep(self, transitions: List[Transition], trigger_data: TriggerData): """Process a single set of transitions in a 'lock step'. This includes exiting states, executing transition content, and entering states. """ + self._microstep_count += 1 + logger.debug( + "%s macro:%d micro:%d transitions: %s", + self._log_id, + self._macrostep_count, + self._microstep_count, + transitions, + ) previous_configuration = self.sm.configuration try: result = self._execute_transition_content( @@ -451,7 +468,7 @@ def _prepare_exit_states( states_to_exit, key=lambda x: x.state and x.state.document_order or 0, reverse=True ) result = OrderedSet([info.state for info in ordered_states if info.state]) - logger.debug("States to exit: %s", result) + logger.debug("%s States to exit: %s", self._log_id, result) # Update history for info in ordered_states: @@ -463,7 +480,8 @@ def _prepare_exit_states( history_value = [s for s in self.sm.configuration if s.parent == state] logger.debug( - "Saving '%s.%s' history state: '%s'", + "%s Saving '%s.%s' history state: '%s'", + self._log_id, state, history, [s.id for s in history_value], @@ -493,6 +511,7 @@ def _exit_states( # Execute `onexit` handlers — same per-block error isolation as onentry. if info.state is not None: # pragma: no branch + logger.debug("%s Exiting state: %s", self._log_id, info.state) self.sm._callbacks.call(info.state.exit.key, *args, on_error=on_error, **kwargs) self._remove_state_from_configuration(info.state) @@ -549,7 +568,7 @@ def _prepare_entry_states( new_configuration = cast( OrderedSet[State], (previous_configuration - states_to_exit) | states_targets_to_enter ) - logger.debug("States to enter: %s", states_targets_to_enter) + logger.debug("%s States to enter: %s", self._log_id, states_targets_to_enter) return ordered_states, states_for_default_entry, default_history_content, new_configuration @@ -558,9 +577,17 @@ def _add_state_to_configuration(self, target: State): if not self.sm.atomic_configuration_update: self.sm.configuration |= {target} + def __del__(self): + try: + self._invoke_manager.cancel_all() + except Exception: + pass + def _handle_final_state(self, target: State, on_entry_result: list): """Handle final state entry: queue done events. No direct callback dispatch.""" + logger.debug("%s Reached final state: %s", self._log_id, target) if target.parent is None: + self._invoke_manager.cancel_all() self.running = False else: parent = target.parent @@ -601,10 +628,24 @@ def _enter_states( # noqa: C901 self._prepare_entry_states(enabled_transitions, states_to_exit, previous_configuration) ) + # For transition 'on' content, use on_error only for non-error.execution + # events. During error.execution processing, errors in transition content + # must propagate to microstep() where _send_error_execution's guard + # prevents infinite loops (per SCXML spec: errors during error event + # processing are ignored). + on_error_transition = on_error + if ( + on_error is not None + and trigger_data.event + and str(trigger_data.event) == _ERROR_EXECUTION + ): + on_error_transition = None + result = self._execute_transition_content( enabled_transitions, trigger_data, lambda t: t.on.key, + on_error=on_error_transition, previous_configuration=previous_configuration, new_configuration=new_configuration, ) @@ -621,7 +662,7 @@ def _enter_states( # noqa: C901 target=target, ) - logger.debug("Entering state: %s", target) + logger.debug("%s Entering state: %s", self._log_id, target) self._add_state_to_configuration(target) # Execute `onentry` handlers — each handler is a separate block per @@ -722,7 +763,8 @@ def add_descendant_states_to_enter( # noqa: C901 default_history_content[parent_id] = [info] if state.id in self.sm.history_values: logger.debug( - "History state '%s.%s' %s restoring: '%s'", + "%s History state '%s.%s' %s restoring: '%s'", + self._log_id, state.parent, state, "deep" if state.deep else "shallow", @@ -751,7 +793,8 @@ def add_descendant_states_to_enter( # noqa: C901 else: # Handle default history content logger.debug( - "History state '%s.%s' default content: %s", + "%s History state '%s.%s' default content: %s", + self._log_id, state.parent, state, [t.target.id for t in state.transitions if t.target], diff --git a/statemachine/engines/sync.py b/statemachine/engines/sync.py index 787a2469..d2f97342 100644 --- a/statemachine/engines/sync.py +++ b/statemachine/engines/sync.py @@ -77,7 +77,7 @@ def processing_loop(self, caller_future=None): # noqa: C901 # We will collect the first result as the processing result to keep backwards compatibility # so we need to use a sentinel object instead of `None` because the first result may # be also `None`, and on this case the `first_result` may be overridden by another result. - logger.debug("Processing loop started: %s", self.sm.current_state_value) + logger.debug("%s Processing loop started: %s", self._log_id, self.sm.current_state_value) first_result = self._sentinel try: took_events = True @@ -91,7 +91,12 @@ def processing_loop(self, caller_future=None): # noqa: C901 # handles eventless transitions and internal events while not macrostep_done: - logger.debug("Macrostep: eventless/internal queue") + self._microstep_count = 0 + logger.debug( + "%s Macrostep %d: eventless/internal queue", + self._log_id, + self._macrostep_count, + ) self.clear_cache() internal_event = TriggerData( @@ -105,7 +110,9 @@ def processing_loop(self, caller_future=None): # noqa: C901 internal_event = self.internal_queue.pop() enabled_transitions = self.select_transitions(internal_event) if enabled_transitions: - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) took_events = True self._run_microstep(enabled_transitions, internal_event) @@ -122,7 +129,9 @@ def processing_loop(self, caller_future=None): # noqa: C901 self._run_microstep(enabled_transitions, internal_event) # Process external events - logger.debug("Macrostep: external queue") + logger.debug( + "%s Macrostep %d: external queue", self._log_id, self._macrostep_count + ) while not self.external_queue.is_empty(): self.clear_cache() took_events = True @@ -135,13 +144,20 @@ def processing_loop(self, caller_future=None): # noqa: C901 # transitions can be processed while we wait. break - logger.debug("External event: %s", external_event.event) + self._macrostep_count += 1 + self._microstep_count = 0 + logger.debug( + "%s macrostep %d: event=%s", + self._log_id, + self._macrostep_count, + external_event.event, + ) # Finalize + autoforward for active invocations self._invoke_manager.handle_external_event(external_event) enabled_transitions = self.select_transitions(external_event) - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug("%s Enabled transitions: %s", self._log_id, enabled_transitions) if enabled_transitions: try: result = self.microstep(list(enabled_transitions), external_event) @@ -160,6 +176,7 @@ def processing_loop(self, caller_future=None): # noqa: C901 finally: self._processing.release() + logger.debug("%s Processing loop ended", self._log_id) return first_result if first_result is not self._sentinel else None def enabled_events(self, *args, **kwargs): diff --git a/statemachine/invoke.py b/statemachine/invoke.py index b66632f7..7ea8e5d9 100644 --- a/statemachine/invoke.py +++ b/statemachine/invoke.py @@ -47,6 +47,18 @@ class IInvoke(Protocol): def run(self, ctx: "InvokeContext") -> Any: ... # pragma: no branch +def _stop_child_machine(child: "StateChart | None") -> None: + """Stop a child state machine and cancel all its invocations.""" + if child is None: + return + logger.debug("invoke: stopping child machine %s", type(child).__name__) + try: + child._engine.running = False + child._engine._invoke_manager.cancel_all() + except Exception: + logger.debug("Error stopping child machine", exc_info=True) + + class _InvokeCallableWrapper: """Wraps an IInvoke class/instance or StateChart class for the callback system. @@ -185,8 +197,7 @@ def run(self, _ctx: "InvokeContext") -> Any: return None def on_cancel(self): - # Child machine cleanup — currently a no-op since sync machines - # run to completion in the constructor. + _stop_child_machine(self._child) self._child = None @@ -224,13 +235,17 @@ def run(self, ctx: "InvokeContext") -> "List[Any]": self._cancel_remaining() raise finally: + # Normal exit: all futures completed, safe to shutdown without waiting. self._executor.shutdown(wait=False) return results def on_cancel(self): + # Called from the engine thread — must not block. Cancel pending futures + # and signal shutdown; the invoke thread's run() will detect ctx.cancelled + # and exit, then _cancel()'s thread.join() waits for the actual cleanup. self._cancel_remaining() if self._executor is not None: - self._executor.shutdown(wait=False) + self._executor.shutdown(wait=False, cancel_futures=True) def _cancel_remaining(self): for future in self._futures: @@ -287,20 +302,44 @@ def mark_for_invoke(self, state: "State", event_kwargs: "dict | None" = None): def cancel_for_state(self, state: "State"): """Called by ``_exit_states()`` before exiting a state.""" + logger.debug("invoke cancel_for_state: %s", state.id) for inv_id, inv in list(self._active.items()): - if inv.state_id == state.id and not inv.terminated: + if inv.state_id == state.id and not inv.ctx.cancelled.is_set(): self._cancel(inv_id) self._pending = [(s, kw) for s, kw in self._pending if s.id != state.id] + # Don't cleanup here — terminated invocations must stay in _active + # so that handle_external_event can still run finalize blocks for + # done.invoke events that are already queued. def cancel_all(self): """Cancel all active invocations.""" + logger.debug("invoke cancel_all: %d active", len(self._active)) for inv_id in list(self._active.keys()): self._cancel(inv_id) + self._cleanup_terminated() + + def _cleanup_terminated(self): + """Remove invocations whose threads/tasks have actually finished. + + Only removes invocations that are both terminated AND cancelled. + A terminated-but-not-cancelled invocation means the handler's ``run()`` + has returned but the owning state is still active — the invocation must + stay in ``_active`` so that ``send_to_child()`` can still forward events + to it (e.g. ````). + """ + self._active = { + inv_id: inv + for inv_id, inv in self._active.items() + if not inv.terminated or not inv.ctx.cancelled.is_set() + } # --- Sync spawning --- def spawn_pending_sync(self): """Spawn invoke handlers for all states marked for invocation (sync engine).""" + # Opportunistically clean up finished invocations before spawning new ones. + self._cleanup_terminated() + pending = sorted(self._pending, key=lambda p: p[0].document_order) self._pending.clear() for state, event_kwargs in pending: @@ -323,6 +362,7 @@ def _spawn_one_sync(self, callback: "CallbackWrapper", **kwargs): invocation._handler = handler self._active[ctx.invokeid] = invocation + logger.debug("invoke spawn sync: %s on state %s", ctx.invokeid, state.id) thread = threading.Thread( target=self._run_sync_handler, @@ -354,11 +394,17 @@ def _run_sync_handler( self.sm.send("error.execution", error=e) finally: invocation.terminated = True + logger.debug( + "invoke %s: completed (cancelled=%s)", ctx.invokeid, ctx.cancelled.is_set() + ) # --- Async spawning --- async def spawn_pending_async(self): """Spawn invoke handlers for all states marked for invocation (async engine).""" + # Opportunistically clean up finished invocations before spawning new ones. + self._cleanup_terminated() + pending = sorted(self._pending, key=lambda p: p[0].document_order) self._pending.clear() for state, event_kwargs in pending: @@ -379,6 +425,7 @@ def _spawn_one_async(self, callback: "CallbackWrapper", **kwargs): invocation._handler = handler self._active[ctx.invokeid] = invocation + logger.debug("invoke spawn async: %s on state %s", ctx.invokeid, state.id) loop = asyncio.get_running_loop() task = loop.create_task(self._run_async_handler(callback, handler, ctx, invocation)) @@ -415,23 +462,57 @@ async def _run_async_handler( self.sm.send("error.execution", error=e) finally: invocation.terminated = True + logger.debug( + "invoke %s: completed (cancelled=%s)", ctx.invokeid, ctx.cancelled.is_set() + ) # --- Cancel --- def _cancel(self, invokeid: str): invocation = self._active.get(invokeid) - if not invocation or invocation.terminated: + if not invocation or invocation.ctx.cancelled.is_set(): return + + logger.debug("invoke cancel: %s", invokeid) + # 1) Signal cancellation so the handler can check and stop early. invocation.ctx.cancelled.set() + + # 2) Notify the handler (may stop child SMs, cancel futures, etc.). handler = invocation._handler if handler is not None and hasattr(handler, "on_cancel"): try: handler.on_cancel() except Exception: logger.debug("Error in on_cancel for %s", invokeid, exc_info=True) + + # 3) Cancel the async task (raises CancelledError at next await). if invocation.task is not None and not invocation.task.done(): invocation.task.cancel() + # 4) Wait for the sync thread to actually finish (skip if we ARE + # that thread — e.g. done.invoke processed from within the handler). + if ( + invocation.thread is not None + and invocation.thread is not threading.current_thread() + and invocation.thread.is_alive() + ): + invocation.thread.join(timeout=2.0) + + def send_to_child(self, invokeid: str, event: str, **data) -> bool: + """Send an event to an invoked child session by its invokeid. + + Returns True if the event was forwarded, False if the invocation was + not found or doesn't support event forwarding. + """ + invocation = self._active.get(invokeid) + if invocation is None: + return False + handler = invocation._handler + if handler is not None and hasattr(handler, "on_event"): + handler.on_event(event, **data) + return True + return False + # --- Helpers --- def handle_external_event(self, trigger_data) -> None: @@ -470,11 +551,13 @@ def handle_external_event(self, trigger_data) -> None: # Only forward if the invocation is still running. if ( not inv.terminated + and not inv.ctx.cancelled.is_set() and not is_from_child and hasattr(handler, "autoforward") and handler.autoforward and hasattr(handler, "on_event") ): + logger.debug("invoke autoforward: %s -> %s", event_name, inv.invokeid) handler.on_event(event_name, **trigger_data.kwargs) def _make_context( diff --git a/statemachine/io/scxml/actions.py b/statemachine/io/scxml/actions.py index db409344..5b7cb5ca 100644 --- a/statemachine/io/scxml/actions.py +++ b/statemachine/io/scxml/actions.py @@ -109,9 +109,15 @@ class EventDataWrapper: Otherwise it MUST leave it blank. """ - def __init__(self, event_data): + def __init__(self, event_data=None, *, trigger_data=None): self.event_data = event_data - self.trigger_data = event_data.trigger_data + if trigger_data is not None: + self.trigger_data = trigger_data + elif event_data is not None: + self.trigger_data = event_data.trigger_data + else: + raise ValueError("Either event_data or trigger_data must be provided") + td = self.trigger_data self.sendid = td.send_id self.invokeid = td.kwargs.get("_invokeid", "") @@ -127,21 +133,7 @@ def __init__(self, event_data): @classmethod def from_trigger_data(cls, trigger_data): """Create an EventDataWrapper directly from a TriggerData (no EventData needed).""" - obj = cls.__new__(cls) - obj.event_data = None - obj.sendid = trigger_data.send_id - obj.trigger_data = trigger_data - obj.invokeid = trigger_data.kwargs.get("_invokeid", "") - event = trigger_data.event - if event is None or event.internal: - if "error.execution" == event: - obj.type = "platform" - else: - obj.type = "internal" - obj.origintype = "" - else: - obj.type = "external" - return obj + return cls(trigger_data=trigger_data) def __getattr__(self, name): if self.event_data is not None: @@ -152,22 +144,16 @@ def __eq__(self, value): "This makes SCXML test 329 pass. It assumes that the event is the same instance" return isinstance(value, EventDataWrapper) - @property - def _trigger_data(self): - if self.event_data is not None: - return self.event_data.trigger_data - return self.trigger_data - @property def name(self): if self.event_data is not None: return self.event_data.event - return str(self._trigger_data.event) if self._trigger_data.event else None + return str(self.trigger_data.event) if self.trigger_data.event else None @property def data(self): "Property used by the SCXML namespace" - td = self._trigger_data + td = self.trigger_data if td.kwargs: return _Data(td.kwargs) elif td.args and len(td.args) == 1: @@ -405,6 +391,10 @@ def _send_to_parent(action: SendAction, **kwargs): machine = kwargs["machine"] session = getattr(machine, "_invoke_session", None) if session is None: + logger.warning( + " ignored: machine %r has no _invoke_session", + machine.name, + ) return event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] names = [] @@ -420,6 +410,25 @@ def _send_to_parent(action: SendAction, **kwargs): session.send_to_parent(event, **params_values) +def _send_to_invoke(action: SendAction, invokeid: str, **kwargs): + """Route a to the invoked child session.""" + machine: StateChart = kwargs["machine"] + event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] + names = [] + for name in (action.namelist or "").strip().split(): + if not hasattr(machine.model, name): + raise NameError(f"Namelist variable '{name}' not found on model") + names.append(Param(name=name, expr=name)) + params_values = {} + for param in chain(names, action.params): + if param.expr is None: + continue + params_values[param.name] = _eval(param.expr, **kwargs) + if not machine._engine._invoke_manager.send_to_child(invokeid, event, **params_values): + # Per SCXML spec: if target is not reachable → error.communication + machine.send("error.communication", internal=True) + + def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901 content: Any = () _valid_targets = (None, "#_internal", "internal", "#_parent", "parent") @@ -444,6 +453,9 @@ def send_action(*args, **kwargs): # noqa: C901 if target and target.startswith("#_scxml_"): # Valid SCXML session reference but undispatchable → error.communication machine.send("error.communication", internal=True) + elif target and target.startswith("#_"): + # #_ → route to invoked child session + _send_to_invoke(action, target[2:], **kwargs) else: # Invalid target expression → error.execution (raised as exception) raise ValueError(f"Invalid target: {target}. Must be one of {_valid_targets}") @@ -521,6 +533,30 @@ def script_action(*args, **kwargs): return script_action +def create_invoke_init_callable() -> Callable: + """Create a callback that extracts invoke-specific kwargs and stores them on the machine. + + This is always inserted at position 0 in the initial state's onentry list by the + SCXML processor, so that ``_invoke_session`` and ``_invoke_params`` are handled + before any other callbacks run — even for SMs without a ````. + """ + initialized = False + + def invoke_init(*args, **kwargs): + nonlocal initialized + if initialized: + return + initialized = True + machine = kwargs.get("machine") + if machine is not None: + # Use get() not pop(): each callback receives a copy of kwargs + # (via EventData.extended_kwargs), so pop would be misleading. + machine._invoke_params = kwargs.get("_invoke_params") + machine._invoke_session = kwargs.get("_invoke_session") + + return invoke_init + + def _create_dataitem_callable(action: DataItem) -> Callable: def data_initializer(**kwargs): machine: StateChart = kwargs["machine"] @@ -565,6 +601,7 @@ def datamodel(*args, **kwargs): if initialized: return initialized = True + for act in data_elements: act(**kwargs) diff --git a/statemachine/io/scxml/invoke.py b/statemachine/io/scxml/invoke.py index 2a5005da..1b4b355b 100644 --- a/statemachine/io/scxml/invoke.py +++ b/statemachine/io/scxml/invoke.py @@ -7,10 +7,9 @@ """ import logging -import os from pathlib import Path -from typing import TYPE_CHECKING from typing import Any +from typing import Callable from ...invoke import IInvoke from ...invoke import InvokeContext @@ -18,9 +17,6 @@ from .actions import _eval from .schema import InvokeDefinition -if TYPE_CHECKING: - from .processor import SCXMLProcessor - logger = logging.getLogger(__name__) _VALID_INVOKE_TYPES = { @@ -42,12 +38,13 @@ class SCXMLInvoker: def __init__( self, definition: InvokeDefinition, - processor: "SCXMLProcessor", + base_dir: str, + register_child: "Callable[[str, str], type]", ): self._definition = definition - self._processor = processor + self._register_child = register_child self._child: Any = None - self._base_dir: str = os.getcwd() + self._base_dir: str = base_dir # Duck-typed attributes for InvokeManager self.invoke_id: "str | None" = definition.id @@ -88,21 +85,30 @@ def run(self, ctx: InvokeContext) -> Any: # Parse and create the child machine child_cls = self._create_child_class(scxml_content, ctx.invokeid) - # Create child machine with param overrides and parent session reference. - # _invoke_session must be passed as a kwarg so it's available during - # the constructor (the child SM runs in __init__). + # _invoke_session and _invoke_params are passed as kwargs so that the + # invoke_init callback (inserted at position 0 in the initial state's onentry + # by the processor) can pop them and store them on the machine instance. + # + # The _ChildRefSetter listener captures ``self._child`` during the first + # state entry, before the processing loop blocks. This is necessary + # because the child's ``__init__`` may block for an extended time when + # there are delayed events, and ``on_event()`` needs access to the child + # to forward events from the parent session. session = _InvokeSession(parent=machine, invokeid=ctx.invokeid) + ref_setter = _ChildRefSetter(self) self._child = child_cls( _invoke_params=invoke_params, _invoke_session=session, + listeners=[ref_setter], ) - # Wait for child to reach final state (it already ran in constructor) - # The child sends events to parent via #_parent routing. return None def on_cancel(self): - """Cancel the child machine.""" + """Cancel the child machine and all its invocations.""" + from ...invoke import _stop_child_machine + + _stop_child_machine(self._child) self._child = None def on_event(self, event_name: str, **data): @@ -186,12 +192,25 @@ def _evaluate_params(self, machine) -> dict: def _create_child_class(self, scxml_content: str, invokeid: str): """Parse the child SCXML and create a machine class.""" - from .parser import parse_scxml - child_name = f"invoke_{invokeid}" - definition = parse_scxml(scxml_content) - self._processor.process_definition(definition, location=child_name) - return self._processor.scs[child_name] + return self._register_child(scxml_content, child_name) + + +class _ChildRefSetter: + """Listener that captures the child machine reference during initialization. + + The child's ``__init__`` blocks inside the processing loop (e.g. when there + are delayed events). By using this listener, ``SCXMLInvoker._child`` is set + during the first state entry — *before* the processing loop starts spinning — + so that ``on_event()`` can forward events to the child immediately. + """ + + def __init__(self, invoker: "SCXMLInvoker"): + self._invoker = invoker + + def on_enter_state(self, machine=None, **kwargs): + if self._invoker._child is None and machine is not None: + self._invoker._child = machine class _InvokeSession: diff --git a/statemachine/io/scxml/processor.py b/statemachine/io/scxml/processor.py index 38f3856d..844eb591 100644 --- a/statemachine/io/scxml/processor.py +++ b/statemachine/io/scxml/processor.py @@ -19,6 +19,7 @@ from .actions import EventDataWrapper from .actions import ExecuteBlock from .actions import create_datamodel_action_callable +from .actions import create_invoke_init_callable from .invoke import SCXMLInvoker from .parser import parse_scxml from .schema import HistoryState @@ -65,7 +66,7 @@ def __post_init__(self): class SCXMLProcessor: def __init__(self): - self.scs = {} + self.scs: "Dict[str, type[StateChart]]" = {} self.sessions: Dict[str, SessionData] = {} self._ioprocessors = { "http://www.w3.org/TR/scxml/#SCXMLEventProcessor": self, @@ -81,25 +82,34 @@ def parse_scxml(self, sm_name: str, scxml_content: str): definition = parse_scxml(scxml_content) self.process_definition(definition, location=definition.name or sm_name) - def process_definition(self, definition, location: str): + def process_definition(self, definition, location: str, is_invoked: bool = False): states_dict = self._process_states(definition.states) + # Find the initial state for inserting init callbacks + try: + initial_state = next(s for s in iter(states_dict.values()) if s.get("initial")) + except StopIteration: + initial_state = next(iter(states_dict.values())) + + if "enter" not in initial_state: + initial_state["enter"] = [] + + insert_pos = 0 + + # For invoked children, insert invoke_init to pop _invoke_session/_invoke_params + # from kwargs and store them on the machine instance before any other callbacks. + if is_invoked: + initial_state["enter"].insert(0, create_invoke_init_callable()) # type: ignore[union-attr] + insert_pos = 1 + # Process datamodel (initial variables) if definition.datamodel: datamodel = create_datamodel_action_callable(definition.datamodel) if datamodel: # pragma: no branch – parse_datamodel guarantees non-empty - try: - initial_state = next(s for s in iter(states_dict.values()) if s.get("initial")) - except StopIteration: - # If there's no explicit initial state, use the first one - initial_state = next(iter(states_dict.values())) - - if "enter" not in initial_state: - initial_state["enter"] = [] if isinstance( # pragma: no branch – always a list from lines above initial_state["enter"], list ): - initial_state["enter"].insert(0, datamodel) # type: ignore[arg-type] + initial_state["enter"].insert(insert_pos, datamodel) # type: ignore[arg-type] self._add( location, @@ -201,7 +211,17 @@ def _process_state(self, state: State) -> StateDefinition: # noqa: C901 def _process_invocation(self, invoke_def: InvokeDefinition) -> SCXMLInvoker: """Convert an InvokeDefinition into an SCXMLInvoker.""" - return SCXMLInvoker(definition=invoke_def, processor=self) + return SCXMLInvoker( + definition=invoke_def, + base_dir=os.getcwd(), + register_child=self._register_child, + ) + + def _register_child(self, scxml_content: str, child_name: str) -> type: + """Parse SCXML content, register it as a child machine, and return its class.""" + definition = parse_scxml(scxml_content) + self.process_definition(definition, location=child_name, is_invoked=True) + return self.scs[child_name] def _process_transitions(self, transitions: List[Transition]): result: TransitionsList = [] diff --git a/statemachine/statemachine.py b/statemachine/statemachine.py index ddc9f971..84518b7a 100644 --- a/statemachine/statemachine.py +++ b/statemachine/statemachine.py @@ -142,8 +142,6 @@ def __init__( **kwargs: Any, ): self.model: TModel = model if model is not None else Model() # type: ignore[assignment] - self._invoke_params: "dict | None" = kwargs.pop("_invoke_params", None) - self._invoke_session: Any = kwargs.pop("_invoke_session", None) self.history_values: Dict[ str, List[State] ] = {} # Mapping of compound states to last active state(s). diff --git a/tests/conftest.py b/tests/conftest.py index 647e811b..9c5e83cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import threading import time from datetime import datetime @@ -291,3 +292,41 @@ async def sleep(self, seconds: float): def sm_runner(request): """Fixture that runs tests on both sync and async engines.""" return SMRunner(is_async=request.param == "async") + + +@pytest.fixture(autouse=True) +def _check_leaked_threads(): + """Detect threads leaked by test cases (e.g. invoke daemon threads). + + Snapshots active threads before the test, yields, then checks for any new + threads still alive after teardown. Leaked threads are joined with a + timeout and reported as a test failure. + """ + before = set(threading.enumerate()) + yield + + new_threads = set(threading.enumerate()) - before + if not new_threads: + return + + # Filter out asyncio event loop threads (managed by pytest-asyncio, not by us). + new_threads = {t for t in new_threads if not t.name.startswith("asyncio_")} + if not new_threads: + return + + # Give ephemeral threads (e.g. executor workers) a chance to finish. + for t in new_threads: + t.join(timeout=2.0) + + leaked = [t for t in new_threads if t.is_alive()] + if not leaked: + return + + details: list[str] = [] + for t in leaked: + details.append(f" - {t.name!r} (daemon={t.daemon}, ident={t.ident})") + + pytest.fail( + f"Test leaked {len(leaked)} thread(s) still alive after join:\n" + "\n".join(details), + pytrace=False, + ) diff --git a/tests/scxml/conftest.py b/tests/scxml/conftest.py index 3aab408d..a09fdcce 100644 --- a/tests/scxml/conftest.py +++ b/tests/scxml/conftest.py @@ -8,30 +8,30 @@ # xfail sets — tests that fail identically on both engines XFAIL_BOTH = { # mandatory — invoke-related (still failing) - "test187", - "test192", - "test229", - "test236", - "test240", - "test253", - "test554", - # optional - "test201", - "test446", - "test509", - "test510", - "test518", - "test519", - "test520", - "test522", - "test531", - "test532", - "test534", - "test557", - "test558", - "test561", - "test567", - "test577", + "test187", # delayed cancelled when sending session terminates before delay + "test229", # autoforward: parent forwards events to child automatically + "test236", # done.invoke.id arrives after all other child-generated events + "test240", # datamodel values passed to invoked child via namelist and + "test554", # invocation cancelled when evaluation of invoke arguments errors + # optional — ecmascript/JSON datamodel + "test201", # JSON data in parsed in ecmascript datamodel + "test446", # JSON data loaded via src attribute parsed as array + # optional — Basic HTTP Event I/O Processor + "test509", # basic HTTP event I/O processor: send with target + "test510", # basic HTTP event I/O processor: send without target + "test518", # basic HTTP event I/O processor: event field in POST + "test519", # basic HTTP event I/O processor: namelist data in POST body + "test520", # basic HTTP event I/O processor: data in POST body + "test522", # basic HTTP event I/O processor: in POST body + "test531", # basic HTTP event I/O processor: POST response populates _event.data + "test532", # basic HTTP event I/O processor: error.communication on bad target + "test534", # basic HTTP event I/O processor: #_scxml_sessionid target + # optional — data/content handling + "test557", # XML data in content becomes DOM-like object (python datamodel) + "test558", # text data in preserves string type (python datamodel) + "test561", # XML content in events creates DOM object + "test567", # HTTP message parameters populate _event.data + "test577", # without target causes error.communication } XFAIL_SYNC_ONLY: set[str] = set() XFAIL_ASYNC_ONLY: set[str] = set() diff --git a/tests/test_async.py b/tests/test_async.py index 326aa977..b742170b 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -236,11 +236,14 @@ async def test_async_error_on_execution_in_transition(): class SM(StateChart): s1 = State(initial=True) - s2 = State(final=True) + s2 = State() error_state = State(final=True) go = s1.to(s2, on="bad_action") - error_execution = s1.to(error_state) + finish = s2.to(error_state) + # Transition 'on' content error is caught per-block, so the transition + # completes to s2. error.execution fires from s2. + error_execution = s1.to(error_state) | s2.to(error_state) def bad_action(self, **kwargs): raise RuntimeError("Transition boom") diff --git a/tests/test_error_execution.py b/tests/test_error_execution.py index c1fd73b0..e99f2eec 100644 --- a/tests/test_error_execution.py +++ b/tests/test_error_execution.py @@ -84,10 +84,15 @@ class ErrorInErrorHandlerSC(StateChart): """Error in error.execution handler should not cause infinite loop.""" s1 = State("s1", initial=True) - s2 = State("s2", final=True) + s2 = State("s2") + s3 = State("s3", final=True) go = s1.to(s2, on="bad_action") - error_execution = Event(s1.to(s1, on="bad_error_handler"), id="error.execution") + finish = s2.to(s3) + error_execution = Event( + s1.to(s1, on="bad_error_handler") | s2.to(s2, on="bad_error_handler"), + id="error.execution", + ) def bad_action(self): raise RuntimeError("action failed") @@ -174,12 +179,15 @@ def test_error_in_error_handler_no_infinite_loop(): sm = ErrorInErrorHandlerSC() assert sm.configuration == {sm.s1} - # bad_action raises -> error.execution fires -> bad_error_handler raises - # Second error during error.execution processing is ignored (logged as warning) + # bad_action raises -> caught per-block, transition completes to s2 -> + # error.execution fires -> bad_error_handler raises during error.execution + # processing -> rolled back, second error ignored (logged as warning) sm.send("go") - # Machine should still be in s1 (rolled back from failed transition) - assert sm.configuration == {sm.s1} + # Transition 'on' content error is caught per-block (SCXML spec), + # so the transition s1->s2 completes. error.execution fires from s2, + # bad_error_handler raises, which is ignored during error.execution. + assert sm.configuration == {sm.s2} def test_statemachine_with_error_on_execution_true(): @@ -456,7 +464,9 @@ def struggle(self): sm = OneRingTemptation() sm.send("tempt") - # Error in error handler is ignored, machine stays in carrying + # resist raises -> caught per-block, self-transition completes (carrying) -> + # error.execution fires -> struggle raises during error.execution -> + # rolled back, second error ignored -> stays in carrying assert sm.configuration == {sm.carrying} def test_multiple_source_states_with_convention(self): @@ -585,12 +595,11 @@ def kindle(self): sm = BeaconOfGondor() sm.send("light_beacon") - # error.communication.failed won't match error.execution, but - # error_communication_failed will match "error_communication_failed" - # The engine sends "error.execution" which does NOT match - # "error_communication_failed" or "error.communication.failed". - # So the error is unhandled and silently ignored (StateChart default). - assert sm.configuration == {sm.waiting} + # Transition 'on' content error is caught per-block (SCXML spec), + # so waiting->lit completes. error.execution fires from lit, but + # error_communication_failed does NOT match error.execution. + # Error is unhandled and silently ignored (StateChart default). + assert sm.configuration == {sm.lit} def test_multiple_errors_sequential(self): """Multiple events that fail are each handled by error.execution.""" @@ -698,7 +707,8 @@ def test_error_in_on_callback_of_error_handler_is_ignored(self): """If the `on` callback of error.execution raises, the second error is ignored. Per SCXML spec: errors during error.execution processing must not recurse. - The machine should roll back to the configuration before the failed error handler. + During error.execution, transition 'on' content errors propagate to + microstep(), which rolls back and ignores the second error. """ class MountDoom(StateChart): @@ -717,7 +727,9 @@ def gollum_intervenes(self): sm = MountDoom() sm.send("ascend") - # Error in error handler is ignored, config rolled back to climbing + # slip raises -> caught per-block, self-transition completes (climbing) -> + # error.execution fires -> gollum_intervenes raises during error.execution -> + # rolled back to climbing, second error ignored assert sm.configuration == {sm.climbing} def test_condition_on_error_transition_routes_to_different_states(self): diff --git a/tests/test_invoke.py b/tests/test_invoke.py index fb2fce31..9905eeb3 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -420,9 +420,11 @@ def on_enter_error_state(self, **kwargs): async def test_group_cancel_on_exit(self, sm_runner): """Cancellation propagates: exiting state stops the group.""" + cancel_flag = threading.Event() def slow_task(): - time.sleep(5.0) + # Use interruptible wait so thread can exit promptly on cancellation. + cancel_flag.wait(timeout=5.0) return "should not complete" class SM(StateChart): @@ -433,6 +435,7 @@ class SM(StateChart): sm = await sm_runner.start(SM) await sm_runner.sleep(0.05) await sm_runner.send(sm, "cancel") + cancel_flag.set() # Unblock the slow_task thread await sm_runner.sleep(0.1) assert "stopped" in sm.configuration_values diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index aced9921..6d1daa10 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -1,5 +1,6 @@ """Unit tests for SCXML parser, actions, and schema modules.""" +import logging import xml.etree.ElementTree as ET from unittest.mock import Mock @@ -8,13 +9,16 @@ from statemachine.io.scxml.actions import ParseTime from statemachine.io.scxml.actions import create_action_callable from statemachine.io.scxml.actions import create_datamodel_action_callable +from statemachine.io.scxml.invoke import SCXMLInvoker from statemachine.io.scxml.parser import parse_element from statemachine.io.scxml.parser import parse_scxml from statemachine.io.scxml.parser import strip_namespaces from statemachine.io.scxml.schema import CancelAction from statemachine.io.scxml.schema import DataModel from statemachine.io.scxml.schema import IfBranch +from statemachine.io.scxml.schema import InvokeDefinition from statemachine.io.scxml.schema import LogAction +from statemachine.io.scxml.schema import Param # --- ParseTime --- @@ -355,3 +359,253 @@ def test_history_without_transitions(self): processor.parse_scxml("test_history_no_trans", scxml) sm = processor.start() assert sm.states_map["a"] in sm.configuration + + +# --- SCXMLInvoker --- + + +def _make_invoker(definition=None, base_dir="/tmp", register_child=None): + """Helper to create an SCXMLInvoker with sensible defaults.""" + if definition is None: + definition = InvokeDefinition() + if register_child is None: + register_child = Mock(return_value=Mock) + return SCXMLInvoker( + definition=definition, + base_dir=base_dir, + register_child=register_child, + ) + + +class TestSCXMLInvoker: + def test_invalid_invoke_type_raises(self): + """run() raises ValueError for unsupported invoke type.""" + defn = InvokeDefinition( + type="http://unsupported/type", + content="", + ) + invoker = _make_invoker(definition=defn) + ctx = Mock() + model = Mock(spec=[]) + ctx.machine = Mock(model=model) + + with pytest.raises(ValueError, match="Unsupported invoke type"): + invoker.run(ctx) + + def test_no_content_resolved_raises(self): + """run() raises ValueError when no src/content/srcexpr is provided.""" + defn = InvokeDefinition() # no content, src, or srcexpr + invoker = _make_invoker(definition=defn) + ctx = Mock() + model = Mock(spec=[]) + ctx.machine = Mock(model=model) + + with pytest.raises(ValueError, match="No content resolved"): + invoker.run(ctx) + + def test_resolve_content_inline_xml(self): + """_resolve_content returns inline XML content directly.""" + xml_content = '' + defn = InvokeDefinition(content=xml_content) + invoker = _make_invoker(definition=defn) + + result = invoker._resolve_content(Mock()) + assert result == xml_content + + def test_resolve_content_from_file(self, tmp_path): + """_resolve_content reads content from src file path.""" + scxml_file = tmp_path / "child.scxml" + scxml_file.write_text("") + + defn = InvokeDefinition(src="child.scxml") + invoker = _make_invoker(definition=defn, base_dir=str(tmp_path)) + + result = invoker._resolve_content(Mock()) + assert result == "" + + def test_evaluate_params_namelist_and_params(self): + """_evaluate_params resolves both namelist variables and param elements.""" + defn = InvokeDefinition( + namelist="var1 var2", + params=[Param(name="p1", expr="42")], + ) + invoker = _make_invoker(definition=defn) + + model = type("Model", (), {"var1": "a", "var2": "b"})() + machine = Mock(model=model) + + result = invoker._evaluate_params(machine) + assert result == {"var1": "a", "var2": "b", "p1": 42} + + def test_on_cancel_clears_child(self): + """on_cancel() sets _child to None.""" + invoker = _make_invoker() + invoker._child = Mock() + + invoker.on_cancel() + assert invoker._child is None + + def test_on_event_skips_terminated_child(self): + """on_event() does not error when child is terminated.""" + invoker = _make_invoker() + child = Mock() + child.is_terminated = True + invoker._child = child + + # Should not raise or call send + invoker.on_event("some.event") + child.send.assert_not_called() + + def test_on_finalize_without_block_is_noop(self): + """on_finalize() does nothing when no finalize block is defined.""" + invoker = _make_invoker() + assert invoker._finalize_block is None + + # Should not raise + trigger_data = Mock() + invoker.on_finalize(trigger_data) + + def test_send_to_parent_warns_without_session(self, caplog): + """_send_to_parent logs a warning when machine has no _invoke_session.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + action = SendAction(event="done", target="#_parent") + machine = Mock(spec=[]) # spec=[] ensures no _invoke_session attribute + machine.name = "test_machine" + + with caplog.at_level(logging.WARNING, logger="statemachine.io.scxml.actions"): + _send_to_parent(action, machine=machine) + + assert "no _invoke_session" in caplog.text + + +# --- _send_to_invoke --- + + +class TestSendToInvoke: + """Unit tests for _send_to_invoke (routes ).""" + + def _make_machine_with_invoke_manager(self, send_to_child_return=True): + """Create a mock machine with an InvokeManager that has send_to_child.""" + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + machine._engine._invoke_manager.send_to_child.return_value = send_to_child_return + return machine + + def test_routes_event_to_child(self): + """_send_to_invoke forwards the event to InvokeManager.send_to_child.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="childEvent", target="#_child1") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent" + ) + machine.send.assert_not_called() + + def test_sends_error_communication_when_child_not_found(self): + """_send_to_invoke sends error.communication when invokeid is not found.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager(send_to_child_return=False) + action = SendAction(event="childEvent", target="#_unknown") + + _send_to_invoke(action, "unknown", machine=machine) + + machine.send.assert_called_once_with("error.communication", internal=True) + + def test_evaluates_eventexpr(self): + """_send_to_invoke evaluates eventexpr when event is None.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event=None, eventexpr="'dynamic_event'", target="#_child1") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "dynamic_event" + ) + + def test_forwards_params(self): + """_send_to_invoke forwards evaluated params to send_to_child.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction( + event="childEvent", + target="#_child1", + params=[Param(name="x", expr="42"), Param(name="y", expr="'hello'")], + ) + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent", x=42, y="hello" + ) + + def test_forwards_namelist_variables(self): + """_send_to_invoke resolves namelist variables from model and forwards them.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + model = type("Model", (), {})() + model.var1 = "alpha" + model.var2 = "beta" + machine.model = model + action = SendAction(event="childEvent", target="#_child1", namelist="var1 var2") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent", var1="alpha", var2="beta" + ) + + def test_namelist_missing_variable_raises(self): + """_send_to_invoke raises NameError when namelist variable is not on model.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + machine.model = Mock(spec=[]) # no attributes + action = SendAction(event="childEvent", target="#_child1", namelist="missing_var") + + with pytest.raises(NameError, match="missing_var"): + _send_to_invoke(action, "child1", machine=machine) + + def test_send_action_callable_routes_invoke_target(self): + """create_send_action_callable routes #_ targets to _send_to_invoke.""" + from statemachine.io.scxml.actions import create_send_action_callable + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="hello", target="#_myinvoke") + send_callable = create_send_action_callable(action) + + send_callable(machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with("myinvoke", "hello") + + def test_send_action_callable_scxml_session_target(self): + """create_send_action_callable sends error.communication for #_scxml_ targets.""" + from statemachine.io.scxml.actions import create_send_action_callable + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="hello", target="#_scxml_session123") + send_callable = create_send_action_callable(action) + + send_callable(machine=machine) + + machine.send.assert_called_once_with("error.communication", internal=True) + machine._engine._invoke_manager.send_to_child.assert_not_called() From 6c9499391163f84abd419d80afaba779eb89800e Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Fri, 20 Feb 2026 23:10:19 -0300 Subject: [PATCH 3/6] docs(invoke): explain why error.execution uses the external queue Invoke handlers run in background threads, outside the processing loop. Using the internal queue would either contaminate an unrelated macrostep or stall indefinitely. This matches done.invoke, which also uses the external queue for the same reason. --- statemachine/invoke.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/statemachine/invoke.py b/statemachine/invoke.py index 7ea8e5d9..68eae851 100644 --- a/statemachine/invoke.py +++ b/statemachine/invoke.py @@ -391,6 +391,12 @@ def _run_sync_handler( ) except Exception as e: if not ctx.cancelled.is_set(): + # Intentionally using the external queue (no internal=True): + # This handler runs in a background thread, outside the processing + # loop. Using the internal queue would either contaminate an + # unrelated macrostep in progress, or stall if no macrostep is + # active (the internal queue is only drained within a macrostep). + # This matches done.invoke, which also uses the external queue. self.sm.send("error.execution", error=e) finally: invocation.terminated = True @@ -459,6 +465,7 @@ async def _run_async_handler( return except Exception as e: if not ctx.cancelled.is_set(): + # External queue — see comment in _run_sync_handler. self.sm.send("error.execution", error=e) finally: invocation.terminated = True From fd7b4bdba0c10bf7478b23be436c0ca6ec58ee2d Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Fri, 20 Feb 2026 23:38:00 -0300 Subject: [PATCH 4/6] test: improve coverage for invoke, SCXML actions, and async engine Add 19 tests covering uncovered lines in modified files: - EventDataWrapper edge cases (no args, __getattr__, name via trigger_data) - _send_to_parent namelist errors and param expr=None skip - _send_to_invoke param without expr skip - invoke_init idempotent behavior - SCXMLInvoker: on_event exception, non-string content, param location - Parser: child XML/text, text content - InvokeManager: send_to_child not found / no on_event, null event - _stop_child_machine exception handling - BaseEngine.__del__ cancel_all exception - Async engine error in before callbacks Also fix _make_invoker to not hardcode /tmp, and document coverage report commands in AGENTS.md. --- AGENTS.md | 14 +- tests/test_async.py | 23 ++++ tests/test_invoke.py | 80 ++++++++++++ tests/test_scxml_units.py | 261 +++++++++++++++++++++++++++++++++++++- 4 files changed, 376 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 8fbf1fba..e3321431 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -127,7 +127,19 @@ timeout 120 uv run pytest -n 4 Testes normally run under 60s (~40s on average), so take a closer look if they take longer, it can be a regression. -Coverage is enabled by default. +Coverage is enabled by default (`--cov` is in `pyproject.toml`'s `addopts`). To generate a +coverage report to a file, pass `--cov-report` **in addition to** `--cov`: + +```bash +# JSON report (machine-readable, includes missing_lines per file) +timeout 120 uv run pytest -n auto --cov=statemachine --cov-report=json:cov.json + +# Terminal report with missing lines +timeout 120 uv run pytest -n auto --cov=statemachine --cov-report=term-missing +``` + +Note: `--cov=statemachine` is required to activate coverage collection; `--cov-report` +alone only changes the output format. ### Testing both sync and async engines diff --git a/tests/test_async.py b/tests/test_async.py index b742170b..03a717e5 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -273,6 +273,29 @@ def after_go(self, **kwargs): assert sm.configuration == {sm.error_state} +@pytest.mark.timeout(5) +async def test_async_error_on_execution_in_before(): + """Async engine catches errors in before callbacks with error_on_execution.""" + + class SM(StateChart): + s1 = State(initial=True) + error_state = State(final=True) + + go = s1.to(s1) + error_execution = s1.to(error_state) + + def before_go(self, **kwargs): + raise RuntimeError("Before boom") + + async def on_enter_state(self, **kwargs): + """Async callback to force the async engine.""" + + sm = SM() + await sm.activate_initial_state() + await sm.go() + assert sm.configuration == {sm.error_state} + + @pytest.mark.timeout(5) async def test_async_invalid_definition_in_transition_propagates(): """InvalidDefinition in async transition propagates.""" diff --git a/tests/test_invoke.py b/tests/test_invoke.py index 9905eeb3..87d6fa20 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -990,3 +990,83 @@ def on_invoke_loading(self, ctx=None, **kwargs): await sm_runner.processing_loop(sm) assert "loading" in sm.configuration_values + + +class TestInvokeManagerUnit: + """Unit tests for InvokeManager methods not exercised by integration tests.""" + + def test_send_to_child_not_found(self): + """send_to_child returns False when invokeid is not in _active.""" + from unittest.mock import Mock + + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + assert manager.send_to_child("nonexistent", "event") is False + + def test_send_to_child_handler_without_on_event(self): + """send_to_child returns False when handler has no on_event.""" + from unittest.mock import Mock + + from statemachine.invoke import Invocation + from statemachine.invoke import InvokeContext + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + handler = Mock(spec=[]) # no on_event + ctx = InvokeContext(invokeid="test_id", state_id="s1", send=Mock(), machine=Mock()) + inv = Invocation(invokeid="test_id", state_id="s1", ctx=ctx, _handler=handler) + manager._active["test_id"] = inv + + assert manager.send_to_child("test_id", "event") is False + + def test_handle_external_event_none_event(self): + """handle_external_event returns early when event is None.""" + from unittest.mock import Mock + + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + trigger_data = Mock(event=None) + # Should not raise + manager.handle_external_event(trigger_data) + + +class TestStopChildMachine: + """Tests for _stop_child_machine.""" + + def test_stop_child_machine_exception_swallowed(self): + """_stop_child_machine swallows exceptions during stop.""" + from unittest.mock import Mock + + from statemachine.invoke import _stop_child_machine + + child = Mock() + child._engine.running = True + child._engine._invoke_manager.cancel_all.side_effect = RuntimeError("boom") + + # Should not raise + _stop_child_machine(child) + + +class TestEngineDelCleanup: + """Test BaseEngine.__del__ cancel_all exception handling.""" + + def test_del_swallows_cancel_all_exception(self): + """__del__ swallows exceptions from cancel_all.""" + + class SM(StateChart): + s1 = State(initial=True, final=True) + + sm = SM() + engine = sm._engine + engine._invoke_manager.cancel_all = lambda: (_ for _ in ()).throw(RuntimeError("boom")) + + # Should not raise + engine.__del__() diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index 6d1daa10..c9c4f579 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -5,6 +5,7 @@ from unittest.mock import Mock import pytest +from statemachine.io.scxml.actions import EventDataWrapper from statemachine.io.scxml.actions import Log from statemachine.io.scxml.actions import ParseTime from statemachine.io.scxml.actions import create_action_callable @@ -364,10 +365,12 @@ def test_history_without_transitions(self): # --- SCXMLInvoker --- -def _make_invoker(definition=None, base_dir="/tmp", register_child=None): +def _make_invoker(definition=None, base_dir=None, register_child=None): """Helper to create an SCXMLInvoker with sensible defaults.""" if definition is None: definition = InvokeDefinition() + if base_dir is None: + base_dir = "" if register_child is None: register_child = Mock(return_value=Mock) return SCXMLInvoker( @@ -609,3 +612,259 @@ def test_send_action_callable_scxml_session_target(self): machine.send.assert_called_once_with("error.communication", internal=True) machine._engine._invoke_manager.send_to_child.assert_not_called() + + +# --- EventDataWrapper coverage --- + + +class TestEventDataWrapperEdgeCases: + def test_no_event_data_no_trigger_data_raises(self): + """EventDataWrapper raises ValueError when neither is provided.""" + with pytest.raises(ValueError, match="Either event_data or trigger_data"): + EventDataWrapper() + + def test_getattr_with_event_data_delegates(self): + """__getattr__ delegates to event_data when present.""" + event_data = Mock() + event_data.trigger_data = Mock( + kwargs={}, send_id=None, event=Mock(internal=True, __str__=lambda s: "test") + ) + event_data.some_custom_attr = "custom_value" + wrapper = EventDataWrapper(event_data) + assert wrapper.some_custom_attr == "custom_value" + + def test_getattr_without_event_data_raises(self): + """__getattr__ raises AttributeError when event_data is None.""" + trigger_data = Mock(kwargs={}, send_id=None, event=Mock(internal=True)) + trigger_data.event.__str__ = lambda s: "test" + wrapper = EventDataWrapper(trigger_data=trigger_data) + with pytest.raises(AttributeError, match="no attribute 'missing_attr'"): + wrapper.missing_attr # noqa: B018 + + def test_name_via_trigger_data(self): + """name property returns event string from trigger_data when no event_data.""" + trigger_data = Mock(kwargs={}, send_id=None, event=Mock(internal=True)) + trigger_data.event.__str__ = lambda s: "my.event" + wrapper = EventDataWrapper(trigger_data=trigger_data) + assert wrapper.name == "my.event" + + +# --- _send_to_parent coverage --- + + +class TestSendToParentParams: + def test_send_to_parent_with_namelist_and_params(self): + """_send_to_parent resolves namelist and params before sending.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + model = type("Model", (), {})() + model.myvar = "hello" + machine = Mock(model=model) + machine.model.__dict__ = {"myvar": "hello"} + session = Mock() + machine._invoke_session = session + + action = SendAction( + event="childDone", + target="#_parent", + namelist="myvar", + params=[Param(name="extra", expr="42")], + ) + + _send_to_parent(action, machine=machine) + + session.send_to_parent.assert_called_once_with("childDone", myvar="hello", extra=42) + + def test_send_to_parent_namelist_missing_raises(self): + """_send_to_parent raises NameError when namelist variable is missing.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock(spec=[]) # no attributes + machine._invoke_session = Mock() + + action = SendAction(event="ev", target="#_parent", namelist="missing_var") + + with pytest.raises(NameError, match="missing_var"): + _send_to_parent(action, machine=machine) + + def test_send_to_parent_param_without_expr_skipped(self): + """_send_to_parent skips params where expr is None.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + session = Mock() + machine._invoke_session = session + + action = SendAction( + event="ev", + target="#_parent", + params=[ + Param(name="has_expr", expr="1"), + Param(name="no_expr", expr=None), + ], + ) + + _send_to_parent(action, machine=machine) + session.send_to_parent.assert_called_once_with("ev", has_expr=1) + + +# --- _send_to_invoke param skip coverage --- + + +class TestSendToInvokeParamSkip: + def test_param_without_expr_is_skipped(self): + """_send_to_invoke skips params where expr is None.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + machine._engine._invoke_manager.send_to_child.return_value = True + + action = SendAction( + event="ev", + target="#_child", + params=[ + Param(name="with_expr", expr="1"), + Param(name="no_expr", expr=None), + ], + ) + + _send_to_invoke(action, "child", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child", "ev", with_expr=1 + ) + + +# --- invoke_init coverage --- + + +class TestInvokeInitCallback: + def test_invoke_init_idempotent(self): + """invoke_init only runs once, even if called multiple times.""" + from statemachine.io.scxml.actions import create_invoke_init_callable + + callback = create_invoke_init_callable() + machine = Mock() + + callback(machine=machine) + assert machine._invoke_params is not None or True # first call sets attrs + + # Reset to detect second call + machine._invoke_params = "first" + callback(machine=machine) + # Should NOT have been overwritten + assert machine._invoke_params == "first" + + +# --- SCXMLInvoker edge cases --- + + +class TestSCXMLInvokerEdgeCases: + def test_on_event_exception_in_child_send(self): + """on_event swallows exceptions from child.send().""" + invoker = _make_invoker() + child = Mock() + child.is_terminated = False + child.send.side_effect = RuntimeError("child error") + invoker._child = child + + # Should not raise + invoker.on_event("some.event") + child.send.assert_called_once_with("some.event") + + def test_resolve_content_expr_non_string(self): + """_resolve_content converts non-string eval result to string.""" + defn = InvokeDefinition(content="42") # evaluates to int + invoker = _make_invoker(definition=defn) + machine = Mock() + machine.model.__dict__ = {} + + result = invoker._resolve_content(machine) + assert result == "42" + + def test_evaluate_params_with_location(self): + """_evaluate_params resolves param with location instead of expr.""" + defn = InvokeDefinition( + params=[Param(name="p1", expr=None, location="myvar")], + ) + invoker = _make_invoker(definition=defn) + + model = type("Model", (), {})() + model.myvar = "resolved" + machine = Mock(model=model) + machine.model.__dict__ = {"myvar": "resolved"} + + result = invoker._evaluate_params(machine) + assert result == {"p1": "resolved"} + + +# --- Parser edge cases --- + + +class TestParserAssignChildXml: + def test_assign_with_child_xml_content(self): + """ with child XML content is parsed as child_xml.""" + scxml = """ + + + + + + + + + + + + + """ + # Should parse without error — the child XML is stored in child_xml + definition = parse_scxml(scxml) + # Verify it parsed states correctly + assert "s1" in definition.states + + def test_assign_with_text_content(self): + """ with text content (no expr attr) uses text as expr.""" + scxml = """ + + + + + + + 42 + + + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + + +class TestParserInvokeContent: + def test_invoke_with_text_content(self): + """ with text body is parsed.""" + scxml = """ + + + + some text content + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + invoke_def = definition.states["s1"].invocations[0] + assert "some text content" in invoke_def.content From 9d02dc2fef9125ad542b863bce6f65e7005585de Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Fri, 20 Feb 2026 23:57:10 -0300 Subject: [PATCH 5/6] test: improve branch coverage for invoke, SCXML parser, and orderedset --- tests/test_scxml_units.py | 150 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index c9c4f579..e3f0e7b3 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -868,3 +868,153 @@ def test_invoke_with_text_content(self): assert "s1" in definition.states invoke_def = definition.states["s1"].invocations[0] assert "some text content" in invoke_def.content + + def test_invoke_with_content_expr(self): + """ is parsed as dynamic content.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.content == "'dynamic'" + + def test_invoke_with_inline_scxml_no_namespace(self): + """ with inline (no namespace) is parsed.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert " with empty results in content=None.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.content is None + + def test_invoke_with_finalize_block(self): + """ with block is parsed.""" + scxml = """ + + + + child content + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.finalize is not None + assert len(invoke_def.finalize.actions) == 1 + + +class TestParserAssignEdgeCases: + def test_assign_without_children_or_text(self): + """ with neither children nor text results in expr=None.""" + scxml = """ + + + + + + + + + + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + + +class TestSCXMLInvokerResolveContentAbsolutePath: + def test_resolve_content_absolute_path(self, tmp_path): + """_resolve_content with absolute src path doesn't prepend base_dir.""" + scxml_file = tmp_path / "child.scxml" + scxml_file.write_text("") + + defn = InvokeDefinition(src=str(scxml_file)) + invoker = _make_invoker(definition=defn, base_dir="/some/other/dir") + + result = invoker._resolve_content(Mock()) + assert result == "" + + +class TestSCXMLInvokerEvaluateParamsNoExprNoLocation: + def test_param_without_expr_or_location_skipped(self): + """_evaluate_params skips params with neither expr nor location.""" + defn = InvokeDefinition( + params=[Param(name="p1", expr=None, location=None)], + ) + invoker = _make_invoker(definition=defn) + machine = Mock(model=type("M", (), {})()) + machine.model.__dict__ = {} + + result = invoker._evaluate_params(machine) + assert result == {} + + +class TestInvokeInitMachineNone: + def test_invoke_init_without_machine_is_noop(self): + """invoke_init does nothing when machine is not in kwargs.""" + from statemachine.io.scxml.actions import create_invoke_init_callable + + callback = create_invoke_init_callable() + # Call without machine kwarg — should not raise + callback() + + +class TestInvokeCallableWrapperRunInstance: + def test_run_with_instance_not_class(self): + """_InvokeCallableWrapper.run() works with an instance (not a class).""" + from statemachine.invoke import _InvokeCallableWrapper + + class Handler: + def run(self, ctx): + return "result" + + handler_instance = Handler() + wrapper = _InvokeCallableWrapper(handler_instance) + assert not wrapper._is_class + + ctx = Mock() + result = wrapper.run(ctx) + assert result == "result" + assert wrapper._instance is handler_instance + + +class TestOrderedSetStr: + def test_str_representation(self): + """OrderedSet.__str__ returns a set-like string.""" + from statemachine.orderedset import OrderedSet + + os = OrderedSet([1, 2, 3]) + assert str(os) == "{1, 2, 3}" From 446ddb1e76247c050668289b319c725b2c547caa Mon Sep 17 00:00:00 2001 From: Fernando Macedo Date: Sat, 21 Feb 2026 00:27:40 -0300 Subject: [PATCH 6/6] refactor(parser): remove dead namespaced find in parse_invoke The `child.find("{http://www.w3.org/2005/07/scxml}scxml")` call was unreachable because `strip_namespaces()` always runs before `parse_invoke()`. Replaced with direct `child.find("scxml")`. Also added a test for unknown child elements inside ``. --- statemachine/io/scxml/parser.py | 6 ++---- tests/test_scxml_units.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/statemachine/io/scxml/parser.py b/statemachine/io/scxml/parser.py index 5fb482e0..229914aa 100644 --- a/statemachine/io/scxml/parser.py +++ b/statemachine/io/scxml/parser.py @@ -442,10 +442,8 @@ def parse_invoke(element: ET.Element) -> InvokeDefinition: location = child.attrib.get("location") params.append(Param(name=name, expr=expr, location=location)) elif child.tag == "content": - # Check for inline element - scxml_child = child.find("{http://www.w3.org/2005/07/scxml}scxml") - if scxml_child is None: - scxml_child = child.find("scxml") + # Check for inline element (namespaces already stripped) + scxml_child = child.find("scxml") if scxml_child is not None: # Serialize the inline SCXML back to string for later parsing content = ET.tostring(scxml_child, encoding="unicode") diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index e3f0e7b3..31287403 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -899,6 +899,22 @@ def test_invoke_with_inline_scxml_no_namespace(self): invoke_def = definition.states["s1"].invocations[0] assert " are silently ignored.""" + scxml = """ + + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert len(invoke_def.params) == 1 + def test_invoke_with_empty_content(self): """ with empty results in content=None.""" scxml = """